OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <tbb/parallel_for.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 #include <boost/sort/spreadsort/string_sort.hpp>
23 #include <future>
24 #include <iostream>
25 #include <string_view>
26 #include <thread>
27 
28 // TODO(adb): fixup
29 #ifdef _WIN32
30 #include <fcntl.h>
31 #include <io.h>
32 #else
33 #include <sys/fcntl.h>
34 #endif
35 
36 #include "Logger/Logger.h"
37 #include "OSDependent/omnisci_fs.h"
38 #include "Shared/sqltypes.h"
39 #include "Shared/thread_count.h"
40 #include "StringDictionaryClient.h"
41 #include "Utils/Regexp.h"
42 #include "Utils/StringLike.h"
43 
44 #include "LeafHostInfo.h"
45 
47 
48 namespace {
49 
51 
52 int checked_open(const char* path, const bool recover) {
53  auto fd = omnisci::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
54  if (fd > 0) {
55  return fd;
56  }
57  auto err = std::string("Dictionary path ") + std::string(path) +
58  std::string(" does not exist.");
59  LOG(ERROR) << err;
60  throw DictPayloadUnavailable(err);
61 }
62 
63 const uint64_t round_up_p2(const uint64_t num) {
64  uint64_t in = num;
65  in--;
66  in |= in >> 1;
67  in |= in >> 2;
68  in |= in >> 4;
69  in |= in >> 8;
70  in |= in >> 16;
71  in++;
72  // TODO MAT deal with case where filesize has been increased but reality is
73  // we are constrained to 2^31.
74  // In that situation this calculation will wrap to zero
75  if (in == 0 || (in > (UINT32_MAX))) {
76  in = UINT32_MAX;
77  }
78  return in;
79 }
80 
81 string_dict_hash_t hash_string(const std::string_view& str) {
82  string_dict_hash_t str_hash = 1;
83  // rely on fact that unsigned overflow is defined and wraps
84  for (size_t i = 0; i < str.size(); ++i) {
85  str_hash = str_hash * 997 + str[i];
86  }
87  return str_hash;
88 }
89 } // namespace
90 
92 constexpr int32_t StringDictionary::INVALID_STR_ID;
93 constexpr size_t StringDictionary::MAX_STRLEN;
94 constexpr size_t StringDictionary::MAX_STRCOUNT;
95 
96 StringDictionary::StringDictionary(const std::string& folder,
97  const bool isTemp,
98  const bool recover,
99  const bool materializeHashes,
100  size_t initial_capacity)
101  : folder_(folder)
102  , str_count_(0)
103  , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
104  , hash_cache_(initial_capacity)
105  , isTemp_(isTemp)
106  , materialize_hashes_(materializeHashes)
107  , payload_fd_(-1)
108  , offset_fd_(-1)
109  , offset_map_(nullptr)
110  , payload_map_(nullptr)
111  , offset_file_size_(0)
112  , payload_file_size_(0)
113  , payload_file_off_(0)
114  , strings_cache_(nullptr) {
115  if (!isTemp && folder.empty()) {
116  return;
117  }
118 
119  // initial capacity must be a power of two for efficient bucket computation
120  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
121  if (!isTemp_) {
122  boost::filesystem::path storage_path(folder);
123  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
124  const auto payload_path =
125  (storage_path / boost::filesystem::path("DictPayload")).string();
126  payload_fd_ = checked_open(payload_path.c_str(), recover);
127  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
130  }
131  bool storage_is_empty = false;
132  if (payload_file_size_ == 0) {
133  storage_is_empty = true;
135  }
136  if (offset_file_size_ == 0) {
138  }
139  if (!isTemp_) { // we never mmap or recover temp dictionaries
140  payload_map_ =
141  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
142  offset_map_ = reinterpret_cast<StringIdxEntry*>(
144  if (recover) {
145  const size_t bytes = omnisci::file_size(offset_fd_);
146  if (bytes % sizeof(StringIdxEntry) != 0) {
147  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
148  }
149  const uint64_t str_count =
150  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
151  collisions_ = 0;
152  // at this point we know the size of the StringDict we need to load
153  // so lets reallocate the vector to the correct size
154  const uint64_t max_entries =
155  std::max(round_up_p2(str_count * 2 + 1),
156  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
157  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
158  string_id_string_dict_hash_table_.swap(new_str_ids);
159  if (materialize_hashes_) {
160  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
161  hash_cache_.swap(new_hash_cache);
162  }
163  // Bail early if we know we don't have strings to add (i.e. a new or empty
164  // dictionary)
165  if (str_count == 0) {
166  return;
167  }
168 
169  unsigned string_id = 0;
170  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
171 
172  uint32_t thread_inits = 0;
173  const auto thread_count = std::thread::hardware_concurrency();
174  const uint32_t items_per_thread = std::max<uint32_t>(
175  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
176  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
177  dictionary_futures;
178  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
179  dictionary_futures.emplace_back(std::async(
180  std::launch::async, [string_id, str_count, items_per_thread, this] {
181  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
182  for (uint32_t curr_id = string_id;
183  curr_id < string_id + items_per_thread && curr_id < str_count;
184  curr_id++) {
185  const auto recovered = getStringFromStorage(curr_id);
186  if (recovered.canary) {
187  // hit the canary, recovery finished
188  break;
189  } else {
190  std::string_view temp(recovered.c_str_ptr, recovered.size);
191  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
192  }
193  }
194  return hashVec;
195  }));
196  thread_inits++;
197  if (thread_inits % thread_count == 0) {
198  processDictionaryFutures(dictionary_futures);
199  }
200  }
201  // gather last few threads
202  if (dictionary_futures.size() != 0) {
203  processDictionaryFutures(dictionary_futures);
204  }
205  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
206  << " Hash table size: " << string_id_string_dict_hash_table_.size()
207  << " Fill rate: "
208  << static_cast<double>(str_count_) * 100.0 /
210  << "% Collisions: " << collisions_;
211  }
212  }
213 }
214 
216  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
217  dictionary_futures) {
218  for (auto& dictionary_future : dictionary_futures) {
219  dictionary_future.wait();
220  const auto hashVec = dictionary_future.get();
221  for (const auto& hash : hashVec) {
222  const uint32_t bucket =
224  payload_file_off_ += hash.second;
225  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
226  if (materialize_hashes_) {
227  hash_cache_[str_count_] = hash.first;
228  }
229  ++str_count_;
230  }
231  }
232  dictionary_futures.clear();
233 }
234 
242 size_t StringDictionary::getNumStringsFromStorage(const size_t storage_slots) const
243  noexcept {
244  if (storage_slots == 0) {
245  return 0;
246  }
247  // Must use signed integers since final binary search step can wrap to max size_t value
248  // if dictionary is empty
249  int64_t min_bound = 0;
250  int64_t max_bound = storage_slots - 1;
251  int64_t guess{0};
252  while (min_bound <= max_bound) {
253  guess = (max_bound + min_bound) / 2;
254  CHECK_GE(guess, 0);
255  if (getStringFromStorage(guess).canary) {
256  max_bound = guess - 1;
257  } else {
258  min_bound = guess + 1;
259  }
260  }
261  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
262  return guess + (min_bound > guess ? 1 : 0);
263 }
264 
266  : folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
267  std::to_string(dict_ref.dictId))
268  , strings_cache_(nullptr)
269  , client_(new StringDictionaryClient(host, dict_ref, true))
270  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
271 
273  free(CANARY_BUFFER);
274  if (client_) {
275  return;
276  }
277  if (payload_map_) {
278  if (!isTemp_) {
282  CHECK_GE(payload_fd_, 0);
284  CHECK_GE(offset_fd_, 0);
286  } else {
288  free(payload_map_);
289  free(offset_map_);
290  }
291  }
292 }
293 
294 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
295  if (client_) {
296  std::vector<int32_t> string_ids;
297  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
298  CHECK_EQ(size_t(1), string_ids.size());
299  return string_ids.front();
300  }
301  return getOrAddImpl(str);
302 }
303 
304 namespace {
305 
306 template <class T>
307 void throw_encoding_error(std::string_view str, std::string_view folder) {
308  std::ostringstream oss;
309  oss << "The text encoded column stored at " << folder << ", has exceeded its limit of "
310  << sizeof(T) * 8 << " bits (" << static_cast<size_t>(max_valid_int_value<T>() + 1)
311  << " unique values)."
312  << " There was an attempt to add the new string '" << str
313  << "'. Table will need to be recreated with larger String Dictionary Capacity";
314  LOG(ERROR) << oss.str();
315  throw std::runtime_error(oss.str());
316 }
317 
318 } // namespace
319 
320 template <class String>
322  const std::vector<std::vector<String>>& string_array_vec,
323  std::vector<std::vector<int32_t>>& ids_array_vec) {
324  ids_array_vec.resize(string_array_vec.size());
325  for (size_t i = 0; i < string_array_vec.size(); i++) {
326  auto& strings = string_array_vec[i];
327  auto& ids = ids_array_vec[i];
328  ids.resize(strings.size());
329  getOrAddBulk(strings, &ids[0]);
330  }
331 }
332 
334  const std::vector<std::vector<std::string>>& string_array_vec,
335  std::vector<std::vector<int32_t>>& ids_array_vec);
336 
338  const std::vector<std::vector<std::string_view>>& string_array_vec,
339  std::vector<std::vector<int32_t>>& ids_array_vec);
340 
346 template <class String>
347 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
348  std::vector<string_dict_hash_t>& hashes) const
349  noexcept {
350  CHECK_EQ(string_vec.size(), hashes.size());
351 
352  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
353  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
354  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
355  if (string_vec[curr_id].empty()) {
356  continue;
357  }
358  hashes[curr_id] = hash_string(string_vec[curr_id]);
359  }
360  });
361 }
362 
363 template <class T, class String>
364 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
365  T* output_string_ids) {
367  getOrAddBulkParallel(input_strings, output_string_ids);
368  return;
369  }
370  // Single-thread path.
371  if (client_no_timeout_) {
372  getOrAddBulkRemote(input_strings, output_string_ids);
373  return;
374  }
375  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
376 
377  const size_t initial_str_count = str_count_;
378  size_t idx = 0;
379  for (const auto& input_string : input_strings) {
380  if (input_string.empty()) {
381  output_string_ids[idx++] = inline_int_null_value<T>();
382  continue;
383  }
384  CHECK(input_string.size() <= MAX_STRLEN);
385 
386  const string_dict_hash_t input_string_hash = hash_string(input_string);
387  uint32_t hash_bucket =
388  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
390  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
391  continue;
392  }
393  // need to add record to dictionary
394  // check there is room
395  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
396  throw_encoding_error<T>(input_string, folder_);
397  }
399  << "Maximum number (" << str_count_
400  << ") of Dictionary encoded Strings reached for this column, offset path "
401  "for column is "
402  << offsets_path_;
403  if (fillRateIsHigh(str_count_)) {
404  // resize when more than 50% is full
406  hash_bucket = computeBucket(
407  input_string_hash, input_string, string_id_string_dict_hash_table_);
408  }
409  appendToStorage(input_string);
410 
411  if (materialize_hashes_) {
412  hash_cache_[str_count_] = input_string_hash;
413  }
414  const int32_t string_id = static_cast<int32_t>(str_count_);
415  string_id_string_dict_hash_table_[hash_bucket] = string_id;
416  output_string_ids[idx++] = string_id;
417  ++str_count_;
418  }
419  const size_t num_strings_added = str_count_ - initial_str_count;
420  if (num_strings_added > 0) {
422  }
423 }
424 
425 template <class T, class String>
426 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
427  T* output_string_ids) {
428  if (client_no_timeout_) {
429  getOrAddBulkRemote(input_strings, output_string_ids);
430  return;
431  }
432  // Compute hashes of the input strings up front, and in parallel,
433  // as the string hashing does not need to be behind the subsequent write_lock
434  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
435  hashStrings(input_strings, input_strings_hashes);
436 
437  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
438  size_t shadow_str_count =
439  str_count_; // Need to shadow str_count_ now with bulk add methods
440  const size_t storage_high_water_mark = shadow_str_count;
441  std::vector<size_t> string_memory_ids;
442  size_t sum_new_string_lengths = 0;
443  string_memory_ids.reserve(input_strings.size());
444  size_t input_string_idx{0};
445  for (const auto& input_string : input_strings) {
446  // Currently we make empty strings null
447  if (input_string.empty()) {
448  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
449  continue;
450  }
451  // TODO: Recover gracefully if an input string is too long
452  CHECK(input_string.size() <= MAX_STRLEN);
453 
454  if (fillRateIsHigh(shadow_str_count)) {
455  // resize when more than 50% is full
457  storage_high_water_mark,
458  input_strings,
459  string_memory_ids,
460  input_strings_hashes);
461  }
462  // Compute the hash for this input_string
463  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
464 
465  const uint32_t hash_bucket =
466  computeBucketFromStorageAndMemory(input_string_hash,
467  input_string,
469  storage_high_water_mark,
470  input_strings,
471  string_memory_ids);
472 
473  // If the hash bucket is not empty, that is our string id
474  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
475  // bucket string are equal)
477  output_string_ids[input_string_idx++] =
479  continue;
480  }
481  // Did not find string, so need to add record to dictionary
482  // First check there is room
483  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
484  throw_encoding_error<T>(input_string, folder_);
485  }
486  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
487  << "Maximum number (" << shadow_str_count
488  << ") of Dictionary encoded Strings reached for this column, offset path "
489  "for column is "
490  << offsets_path_;
491 
492  string_memory_ids.push_back(input_string_idx);
493  sum_new_string_lengths += input_string.size();
494  string_id_string_dict_hash_table_[hash_bucket] =
495  static_cast<int32_t>(shadow_str_count);
496  if (materialize_hashes_) {
497  hash_cache_[shadow_str_count] = input_string_hash;
498  }
499  output_string_ids[input_string_idx++] = shadow_str_count++;
500  }
501  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
502  const size_t num_strings_added = shadow_str_count - str_count_;
503  str_count_ = shadow_str_count;
504  if (num_strings_added > 0) {
506  }
507 }
508 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
509  uint8_t* encoded_vec);
510 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
511  uint16_t* encoded_vec);
512 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
513  int32_t* encoded_vec);
514 
515 template void StringDictionary::getOrAddBulk(
516  const std::vector<std::string_view>& string_vec,
517  uint8_t* encoded_vec);
518 template void StringDictionary::getOrAddBulk(
519  const std::vector<std::string_view>& string_vec,
520  uint16_t* encoded_vec);
521 template void StringDictionary::getOrAddBulk(
522  const std::vector<std::string_view>& string_vec,
523  int32_t* encoded_vec);
524 
525 template <class T, class String>
526 void StringDictionary::getOrAddBulkRemote(const std::vector<String>& string_vec,
527  T* encoded_vec) {
529  std::vector<int32_t> string_ids;
530  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
531  size_t out_idx{0};
532  for (size_t i = 0; i < string_ids.size(); ++i) {
533  const auto string_id = string_ids[i];
534  const bool invalid = string_id > max_valid_int_value<T>();
535  if (invalid || string_id == inline_int_null_value<int32_t>()) {
536  if (invalid) {
537  throw_encoding_error<T>(string_vec[i], folder_);
538  }
539  encoded_vec[out_idx++] = inline_int_null_value<T>();
540  continue;
541  }
542  encoded_vec[out_idx++] = string_id;
543  }
544 }
545 
547  const std::vector<std::string>& string_vec,
548  uint8_t* encoded_vec);
550  const std::vector<std::string>& string_vec,
551  uint16_t* encoded_vec);
553  const std::vector<std::string>& string_vec,
554  int32_t* encoded_vec);
555 
557  const std::vector<std::string_view>& string_vec,
558  uint8_t* encoded_vec);
560  const std::vector<std::string_view>& string_vec,
561  uint16_t* encoded_vec);
563  const std::vector<std::string_view>& string_vec,
564  int32_t* encoded_vec);
565 
566 int32_t StringDictionary::getIdOfString(const std::string& str) const {
567  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
568  if (client_) {
569  return client_->get(str);
570  }
571  return getUnlocked(str);
572 }
573 
574 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
575  const string_dict_hash_t hash = hash_string(str);
576  auto str_id = string_id_string_dict_hash_table_[computeBucket(
577  hash, str, string_id_string_dict_hash_table_)];
578  return str_id;
579 }
580 
581 std::string StringDictionary::getString(int32_t string_id) const {
582  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
583  if (client_) {
584  std::string ret;
585  client_->get_string(ret, string_id);
586  return ret;
587  }
588  return getStringUnlocked(string_id);
589 }
590 
591 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
592  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
593  return getStringChecked(string_id);
594 }
595 
596 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
597  noexcept {
598  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
599  CHECK(!client_);
600  CHECK_LE(0, string_id);
601  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
602  return getStringBytesChecked(string_id);
603 }
604 
606  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
607  if (client_) {
608  return client_->storage_entry_count();
609  }
610  return str_count_;
611 }
612 
613 namespace {
614 
615 bool is_like(const std::string& str,
616  const std::string& pattern,
617  const bool icase,
618  const bool is_simple,
619  const char escape) {
620  return icase
621  ? (is_simple ? string_ilike_simple(
622  str.c_str(), str.size(), pattern.c_str(), pattern.size())
623  : string_ilike(str.c_str(),
624  str.size(),
625  pattern.c_str(),
626  pattern.size(),
627  escape))
628  : (is_simple ? string_like_simple(
629  str.c_str(), str.size(), pattern.c_str(), pattern.size())
630  : string_like(str.c_str(),
631  str.size(),
632  pattern.c_str(),
633  pattern.size(),
634  escape));
635 }
636 
637 } // namespace
638 
639 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
640  const bool icase,
641  const bool is_simple,
642  const char escape,
643  const size_t generation) const {
644  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
645  if (client_) {
646  return client_->get_like(pattern, icase, is_simple, escape, generation);
647  }
648  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
649  const auto it = like_cache_.find(cache_key);
650  if (it != like_cache_.end()) {
651  return it->second;
652  }
653  std::vector<int32_t> result;
654  std::vector<std::thread> workers;
655  int worker_count = cpu_threads();
656  CHECK_GT(worker_count, 0);
657  std::vector<std::vector<int32_t>> worker_results(worker_count);
658  CHECK_LE(generation, str_count_);
659  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
660  workers.emplace_back([&worker_results,
661  &pattern,
662  generation,
663  icase,
664  is_simple,
665  escape,
666  worker_idx,
667  worker_count,
668  this]() {
669  for (size_t string_id = worker_idx; string_id < generation;
670  string_id += worker_count) {
671  const auto str = getStringUnlocked(string_id);
672  if (is_like(str, pattern, icase, is_simple, escape)) {
673  worker_results[worker_idx].push_back(string_id);
674  }
675  }
676  });
677  }
678  for (auto& worker : workers) {
679  worker.join();
680  }
681  for (const auto& worker_result : worker_results) {
682  result.insert(result.end(), worker_result.begin(), worker_result.end());
683  }
684  // place result into cache for reuse if similar query
685  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
686 
687  CHECK(it_ok.second);
688 
689  return result;
690 }
691 
692 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
693  std::string comp_operator,
694  size_t generation) {
695  std::vector<int32_t> result;
696  auto eq_id_itr = equal_cache_.find(pattern);
697  int32_t eq_id = MAX_STRLEN + 1;
698  int32_t cur_size = str_count_;
699  if (eq_id_itr != equal_cache_.end()) {
700  auto eq_id = eq_id_itr->second;
701  if (comp_operator == "=") {
702  result.push_back(eq_id);
703  } else {
704  for (int32_t idx = 0; idx <= cur_size; idx++) {
705  if (idx == eq_id) {
706  continue;
707  }
708  result.push_back(idx);
709  }
710  }
711  } else {
712  std::vector<std::thread> workers;
713  int worker_count = cpu_threads();
714  CHECK_GT(worker_count, 0);
715  std::vector<std::vector<int32_t>> worker_results(worker_count);
716  CHECK_LE(generation, str_count_);
717  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
718  workers.emplace_back(
719  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
720  for (size_t string_id = worker_idx; string_id < generation;
721  string_id += worker_count) {
722  const auto str = getStringUnlocked(string_id);
723  if (str == pattern) {
724  worker_results[worker_idx].push_back(string_id);
725  }
726  }
727  });
728  }
729  for (auto& worker : workers) {
730  worker.join();
731  }
732  for (const auto& worker_result : worker_results) {
733  result.insert(result.end(), worker_result.begin(), worker_result.end());
734  }
735  if (result.size() > 0) {
736  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
737  CHECK(it_ok.second);
738  eq_id = result[0];
739  }
740  if (comp_operator == "<>") {
741  for (int32_t idx = 0; idx <= cur_size; idx++) {
742  if (idx == eq_id) {
743  continue;
744  }
745  result.push_back(idx);
746  }
747  }
748  }
749  return result;
750 }
751 
752 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
753  const std::string& comp_operator,
754  const size_t generation) {
755  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
756  if (client_) {
757  return client_->get_compare(pattern, comp_operator, generation);
758  }
759  std::vector<int32_t> ret;
760  if (str_count_ == 0) {
761  return ret;
762  }
763  if (sorted_cache.size() < str_count_) {
764  if (comp_operator == "=" || comp_operator == "<>") {
765  return getEquals(pattern, comp_operator, generation);
766  }
767 
769  }
770  auto cache_index = compare_cache_.get(pattern);
771 
772  if (!cache_index) {
773  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
774  const auto cache_itr = std::lower_bound(
775  sorted_cache.begin(),
776  sorted_cache.end(),
777  pattern,
778  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
779  auto a_str = this->getStringFromStorage(a);
780  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
781  });
782 
783  if (cache_itr == sorted_cache.end()) {
784  cache_index->index = sorted_cache.size() - 1;
785  cache_index->diff = 1;
786  } else {
787  const auto cache_str = getStringFromStorage(*cache_itr);
788  if (!string_eq(
789  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
790  cache_index->index = cache_itr - sorted_cache.begin() - 1;
791  cache_index->diff = 1;
792  } else {
793  cache_index->index = cache_itr - sorted_cache.begin();
794  cache_index->diff = 0;
795  }
796  }
797 
798  compare_cache_.put(pattern, cache_index);
799  }
800 
801  // since we have a cache in form of vector of ints which is sorted according to
802  // corresponding strings in the dictionary all we need is the index of the element
803  // which equal to the pattern that we are trying to match or the index of “biggest”
804  // element smaller than the pattern, to perform all the comparison operators over
805  // string. The search function guarantees we have such index so now it is just the
806  // matter to include all the elements in the result vector.
807 
808  // For < operator if the index that we have points to the element which is equal to
809  // the pattern that we are searching for we simply get all the elements less than the
810  // index. If the element pointed by the index is not equal to the pattern we are
811  // comparing with we also need to include that index in result vector, except when the
812  // index points to 0 and the pattern is lesser than the smallest value in the string
813  // dictionary.
814 
815  if (comp_operator == "<") {
816  size_t idx = cache_index->index;
817  if (cache_index->diff) {
818  idx = cache_index->index + 1;
819  if (cache_index->index == 0 && cache_index->diff > 0) {
820  idx = cache_index->index;
821  }
822  }
823  for (size_t i = 0; i < idx; i++) {
824  ret.push_back(sorted_cache[i]);
825  }
826 
827  // For <= operator if the index that we have points to the element which is equal to
828  // the pattern that we are searching for we want to include the element pointed by
829  // the index in the result set. If the element pointed by the index is not equal to
830  // the pattern we are comparing with we just want to include all the ids with index
831  // less than the index that is cached, except when pattern that we are searching for
832  // is smaller than the smallest string in the dictionary.
833 
834  } else if (comp_operator == "<=") {
835  size_t idx = cache_index->index + 1;
836  if (cache_index == 0 && cache_index->diff > 0) {
837  idx = cache_index->index;
838  }
839  for (size_t i = 0; i < idx; i++) {
840  ret.push_back(sorted_cache[i]);
841  }
842 
843  // For > operator we want to get all the elements with index greater than the index
844  // that we have except, when the pattern we are searching for is lesser than the
845  // smallest string in the dictionary we also want to include the id of the index
846  // that we have.
847 
848  } else if (comp_operator == ">") {
849  size_t idx = cache_index->index + 1;
850  if (cache_index->index == 0 && cache_index->diff > 0) {
851  idx = cache_index->index;
852  }
853  for (size_t i = idx; i < sorted_cache.size(); i++) {
854  ret.push_back(sorted_cache[i]);
855  }
856 
857  // For >= operator when the indexed element that we have points to element which is
858  // equal to the pattern we are searching for we want to include that in the result
859  // vector. If the index that we have does not point to the string which is equal to
860  // the pattern we are searching we don’t want to include that id into the result
861  // vector except when the index is 0.
862 
863  } else if (comp_operator == ">=") {
864  size_t idx = cache_index->index;
865  if (cache_index->diff) {
866  idx = cache_index->index + 1;
867  if (cache_index->index == 0 && cache_index->diff > 0) {
868  idx = cache_index->index;
869  }
870  }
871  for (size_t i = idx; i < sorted_cache.size(); i++) {
872  ret.push_back(sorted_cache[i]);
873  }
874  } else if (comp_operator == "=") {
875  if (!cache_index->diff) {
876  ret.push_back(sorted_cache[cache_index->index]);
877  }
878 
879  // For <> operator it is simple matter of not including id of string which is equal
880  // to pattern we are searching for.
881  } else if (comp_operator == "<>") {
882  if (!cache_index->diff) {
883  size_t idx = cache_index->index;
884  for (size_t i = 0; i < idx; i++) {
885  ret.push_back(sorted_cache[i]);
886  }
887  ++idx;
888  for (size_t i = idx; i < sorted_cache.size(); i++) {
889  ret.push_back(sorted_cache[i]);
890  }
891  } else {
892  for (size_t i = 0; i < sorted_cache.size(); i++) {
893  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
894  }
895  }
896 
897  } else {
898  std::runtime_error("Unsupported string comparison operator");
899  }
900  return ret;
901 }
902 
903 namespace {
904 
905 bool is_regexp_like(const std::string& str,
906  const std::string& pattern,
907  const char escape) {
908  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
909 }
910 
911 } // namespace
912 
913 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
914  const char escape,
915  const size_t generation) const {
916  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
917  if (client_) {
918  return client_->get_regexp_like(pattern, escape, generation);
919  }
920  const auto cache_key = std::make_pair(pattern, escape);
921  const auto it = regex_cache_.find(cache_key);
922  if (it != regex_cache_.end()) {
923  return it->second;
924  }
925  std::vector<int32_t> result;
926  std::vector<std::thread> workers;
927  int worker_count = cpu_threads();
928  CHECK_GT(worker_count, 0);
929  std::vector<std::vector<int32_t>> worker_results(worker_count);
930  CHECK_LE(generation, str_count_);
931  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
932  workers.emplace_back([&worker_results,
933  &pattern,
934  generation,
935  escape,
936  worker_idx,
937  worker_count,
938  this]() {
939  for (size_t string_id = worker_idx; string_id < generation;
940  string_id += worker_count) {
941  const auto str = getStringUnlocked(string_id);
942  if (is_regexp_like(str, pattern, escape)) {
943  worker_results[worker_idx].push_back(string_id);
944  }
945  }
946  });
947  }
948  for (auto& worker : workers) {
949  worker.join();
950  }
951  for (const auto& worker_result : worker_results) {
952  result.insert(result.end(), worker_result.begin(), worker_result.end());
953  }
954  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
955  CHECK(it_ok.second);
956 
957  return result;
958 }
959 
960 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
961  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
962  if (client_) {
963  // TODO(miyu): support remote string dictionary
964  throw std::runtime_error(
965  "copying dictionaries from remote server is not supported yet.");
966  }
967 
968  if (strings_cache_) {
969  return strings_cache_;
970  }
971 
972  strings_cache_ = std::make_shared<std::vector<std::string>>();
973  strings_cache_->reserve(str_count_);
974  const bool multithreaded = str_count_ > 10000;
975  const auto worker_count =
976  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
977  CHECK_GT(worker_count, 0UL);
978  std::vector<std::vector<std::string>> worker_results(worker_count);
979  auto copy = [this](std::vector<std::string>& str_list,
980  const size_t start_id,
981  const size_t end_id) {
982  CHECK_LE(start_id, end_id);
983  str_list.reserve(end_id - start_id);
984  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
985  str_list.push_back(getStringUnlocked(string_id));
986  }
987  };
988  if (multithreaded) {
989  std::vector<std::future<void>> workers;
990  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
991  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
992  worker_idx < worker_count && start < str_count_;
993  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
994  workers.push_back(std::async(
995  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
996  }
997  for (auto& worker : workers) {
998  worker.get();
999  }
1000  } else {
1001  CHECK_EQ(worker_results.size(), size_t(1));
1002  copy(worker_results[0], 0, str_count_);
1003  }
1004 
1005  for (const auto& worker_result : worker_results) {
1006  strings_cache_->insert(
1007  strings_cache_->end(), worker_result.begin(), worker_result.end());
1008  }
1009  return strings_cache_;
1010 }
1011 
1012 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
1013  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1014 }
1015 
1017  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1018  INVALID_STR_ID);
1019 
1020  if (materialize_hashes_) {
1021  for (size_t i = 0; i != str_count_; ++i) {
1022  const string_dict_hash_t hash = hash_cache_[i];
1023  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1024  new_str_ids[bucket] = i;
1025  }
1026  hash_cache_.resize(hash_cache_.size() * 2);
1027  } else {
1028  for (size_t i = 0; i != str_count_; ++i) {
1029  const auto str = getStringChecked(i);
1030  const string_dict_hash_t hash = hash_string(str);
1031  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1032  new_str_ids[bucket] = i;
1033  }
1034  }
1035  string_id_string_dict_hash_table_.swap(new_str_ids);
1036 }
1037 
1038 template <class String>
1040  const size_t str_count, // str_count_ is only persisted strings, so need transient
1041  // shadow count
1042  const size_t storage_high_water_mark,
1043  const std::vector<String>& input_strings,
1044  const std::vector<size_t>& string_memory_ids,
1045  const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1046  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1047  INVALID_STR_ID);
1048  if (materialize_hashes_) {
1049  for (size_t i = 0; i != str_count; ++i) {
1050  const string_dict_hash_t hash = hash_cache_[i];
1051  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1052  new_str_ids[bucket] = i;
1053  }
1054  hash_cache_.resize(hash_cache_.size() * 2);
1055  } else {
1056  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1057  const auto storage_string = getStringChecked(storage_idx);
1058  const string_dict_hash_t hash = hash_string(storage_string);
1059  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1060  new_str_ids[bucket] = storage_idx;
1061  }
1062  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1063  const size_t string_memory_id = string_memory_ids[memory_idx];
1064  const uint32_t bucket = computeUniqueBucketWithHash(
1065  input_strings_hashes[string_memory_id], new_str_ids);
1066  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1067  }
1068  }
1069  string_id_string_dict_hash_table_.swap(new_str_ids);
1070 }
1071 
1072 int32_t StringDictionary::getOrAddImpl(const std::string_view& str) noexcept {
1073  // @TODO(wei) treat empty string as NULL for now
1074  if (str.size() == 0) {
1075  return inline_int_null_value<int32_t>();
1076  }
1077  CHECK(str.size() <= MAX_STRLEN);
1078  const string_dict_hash_t hash = hash_string(str);
1079  {
1080  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1081  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1082  if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1083  return string_id_string_dict_hash_table_[bucket];
1084  }
1085  }
1086  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1087  if (fillRateIsHigh(str_count_)) {
1088  // resize when more than 50% is full
1089  increaseHashTableCapacity();
1090  }
1091  // need to recalculate the bucket in case it changed before
1092  // we got the lock
1093  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1094  if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1095  CHECK_LT(str_count_, MAX_STRCOUNT)
1096  << "Maximum number (" << str_count_
1097  << ") of Dictionary encoded Strings reached for this column, offset path "
1098  "for column is "
1099  << offsets_path_;
1100  appendToStorage(str);
1101  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1102  if (materialize_hashes_) {
1103  hash_cache_[str_count_] = hash;
1104  }
1105  ++str_count_;
1106  invalidateInvertedIndex();
1107  }
1108  return string_id_string_dict_hash_table_[bucket];
1109 }
1110 
1111 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1112  const auto str_canary = getStringFromStorage(string_id);
1113  CHECK(!str_canary.canary);
1114  return std::string(str_canary.c_str_ptr, str_canary.size);
1115 }
1116 
1118  const int string_id) const noexcept {
1119  const auto str_canary = getStringFromStorage(string_id);
1120  CHECK(!str_canary.canary);
1121  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1122 }
1123 
1124 template <class String>
1126  const string_dict_hash_t hash,
1127  const String& input_string,
1128  const std::vector<int32_t>& string_id_string_dict_hash_table) const noexcept {
1129  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1130  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1131  while (true) {
1132  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1133  if (candidate_string_id ==
1134  INVALID_STR_ID) { // In this case it means the slot is available for use
1135  break;
1136  }
1137  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1138  !materialize_hashes_) {
1139  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1140  if (input_string.size() == candidate_string.size() &&
1141  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1142  // found the string
1143  break;
1144  }
1145  }
1146  // wrap around
1147  if (++bucket == string_dict_hash_table_size) {
1148  bucket = 0;
1149  }
1150  }
1151  return bucket;
1152 }
1153 
1154 template <class String>
1156  const string_dict_hash_t input_string_hash,
1157  const String& input_string,
1158  const std::vector<int32_t>& string_id_string_dict_hash_table,
1159  const size_t storage_high_water_mark,
1160  const std::vector<String>& input_strings,
1161  const std::vector<size_t>& string_memory_ids) const noexcept {
1162  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1163  while (true) {
1164  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1165  if (candidate_string_id ==
1166  INVALID_STR_ID) { // In this case it means the slot is available for use
1167  break;
1168  }
1169  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1170  if (candidate_string_id > 0 &&
1171  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1172  // The candidate string is not in storage yet but in our string_memory_ids temp
1173  // buffer
1174  size_t memory_offset =
1175  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1176  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1177  if (input_string.size() == candidate_string.size() &&
1178  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1179  // found the string in the temp memory buffer
1180  break;
1181  }
1182  } else {
1183  // The candidate string is in storage, need to fetch it for comparison
1184  const auto candidate_storage_string =
1185  getStringFromStorageFast(candidate_string_id);
1186  if (input_string.size() == candidate_storage_string.size() &&
1187  !memcmp(input_string.data(),
1188  candidate_storage_string.data(),
1189  input_string.size())) {
1192  // found the string in storage
1193  break;
1194  }
1195  }
1196  }
1197  if (++bucket == string_id_string_dict_hash_table.size()) {
1198  bucket = 0;
1199  }
1200  }
1201  return bucket;
1202 }
1203 
1205  const string_dict_hash_t hash,
1206  const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1207  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1208  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1209  while (true) {
1210  if (string_id_string_dict_hash_table[bucket] ==
1211  INVALID_STR_ID) { // In this case it means the slot is available for use
1212  break;
1213  }
1214  collisions_++;
1215  // wrap around
1216  if (++bucket == string_dict_hash_table_size) {
1217  bucket = 0;
1218  }
1219  }
1220  return bucket;
1221 }
1222 
1224  const size_t write_length) {
1225  if (payload_file_off_ + write_length > payload_file_size_) {
1226  const size_t min_capacity_needed =
1227  write_length - (payload_file_size_ - payload_file_off_);
1228  if (!isTemp_) {
1229  CHECK_GE(payload_fd_, 0);
1231  addPayloadCapacity(min_capacity_needed);
1232  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1233  payload_map_ =
1234  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1235  } else {
1236  addPayloadCapacity(min_capacity_needed);
1237  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1238  }
1239  }
1240 }
1241 
1243  const size_t write_length) {
1244  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1245  if (offset_file_off + write_length >= offset_file_size_) {
1246  const size_t min_capacity_needed =
1247  write_length - (offset_file_size_ - offset_file_off);
1248  if (!isTemp_) {
1249  CHECK_GE(offset_fd_, 0);
1251  addOffsetCapacity(min_capacity_needed);
1252  CHECK(offset_file_off + write_length <= offset_file_size_);
1253  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1255  } else {
1256  addOffsetCapacity(min_capacity_needed);
1257  CHECK(offset_file_off + write_length <= offset_file_size_);
1258  }
1259  }
1260 }
1261 
1262 template <class String>
1263 void StringDictionary::appendToStorage(const String str) noexcept {
1264  // write the payload
1265  checkAndConditionallyIncreasePayloadCapacity(str.size());
1266  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1267 
1268  // write the offset and length
1269  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1270  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1271 
1272  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1273  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1274 }
1275 
1276 template <class String>
1278  const std::vector<String>& input_strings,
1279  const std::vector<size_t>& string_memory_ids,
1280  const size_t sum_new_strings_lengths) noexcept {
1281  const size_t num_strings = string_memory_ids.size();
1282 
1283  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1284  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1285 
1286  for (size_t i = 0; i < num_strings; ++i) {
1287  const size_t string_idx = string_memory_ids[i];
1288  const String str = input_strings[string_idx];
1289  const size_t str_size(str.size());
1290  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1291  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1292  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1293  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1294  }
1295 }
1296 
1297 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1298  noexcept {
1299  const StringIdxEntry* str_meta = offset_map_ + string_id;
1300  return {payload_map_ + str_meta->off, str_meta->size};
1301 }
1302 
1304  const int string_id) const noexcept {
1305  if (!isTemp_) {
1306  CHECK_GE(payload_fd_, 0);
1307  CHECK_GE(offset_fd_, 0);
1308  }
1309  CHECK_GE(string_id, 0);
1310  const StringIdxEntry* str_meta = offset_map_ + string_id;
1311  if (str_meta->size == 0xffff) {
1312  // hit the canary
1313  return {nullptr, 0, true};
1314  }
1315  return {payload_map_ + str_meta->off, str_meta->size, false};
1316 }
1317 
1318 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1319  if (!isTemp_) {
1320  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1321  } else {
1322  payload_map_ = static_cast<char*>(
1323  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1324  }
1325 }
1326 
1327 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1328  if (!isTemp_) {
1329  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1330  } else {
1331  offset_map_ = static_cast<StringIdxEntry*>(
1332  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1333  }
1334 }
1335 
1337  int fd,
1338  const size_t min_capacity_requested) noexcept {
1339  const size_t canary_buff_size_to_add =
1340  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1341  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1342 
1343  if (canary_buffer_size < canary_buff_size_to_add) {
1344  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1345  canary_buffer_size = canary_buff_size_to_add;
1346  CHECK(CANARY_BUFFER);
1347  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1348  }
1349 
1350  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1351  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1352  CHECK(write_return > 0 &&
1353  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1354  return canary_buff_size_to_add;
1355 }
1356 
1358  size_t& mem_size,
1359  const size_t min_capacity_requested) noexcept {
1360  const size_t canary_buff_size_to_add =
1361  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1362  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1363  if (canary_buffer_size < canary_buff_size_to_add) {
1364  CANARY_BUFFER =
1365  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1366  canary_buffer_size = canary_buff_size_to_add;
1367  CHECK(CANARY_BUFFER);
1368  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1369  }
1370  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1371  CHECK(new_addr);
1372  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1373  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1374  mem_size += canary_buff_size_to_add;
1375  return new_addr;
1376 }
1377 
1379  if (!like_cache_.empty()) {
1380  decltype(like_cache_)().swap(like_cache_);
1381  }
1382  if (!regex_cache_.empty()) {
1383  decltype(regex_cache_)().swap(regex_cache_);
1384  }
1385  if (!equal_cache_.empty()) {
1386  decltype(equal_cache_)().swap(equal_cache_);
1387  }
1388  compare_cache_.invalidateInvertedIndex();
1389 }
1390 
1391 // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1392 // load. The next write to the dictionary that does checkpoint will make the
1393 // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1394 // assuming not replicated dictionary
1396  if (client_) {
1397  try {
1398  return client_->checkpoint();
1399  } catch (...) {
1400  return false;
1401  }
1402  }
1403  CHECK(!isTemp_);
1404  bool ret = true;
1405  ret = ret &&
1406  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1407  ret = ret &&
1408  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1409  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1410  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1411  return ret;
1412 }
1413 
1415  // This method is not thread-safe.
1416  const auto cur_cache_size = sorted_cache.size();
1417  std::vector<int32_t> temp_sorted_cache;
1418  for (size_t i = cur_cache_size; i < str_count_; i++) {
1419  temp_sorted_cache.push_back(i);
1420  }
1421  sortCache(temp_sorted_cache);
1422  mergeSortedCache(temp_sorted_cache);
1423 }
1424 
1425 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1426  // This method is not thread-safe.
1427 
1428  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1429  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1430 
1431  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1432  auto a_str = this->getStringFromStorage(a);
1433  auto b_str = this->getStringFromStorage(b);
1434  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1435  });
1436 }
1437 
1438 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1439  // this method is not thread safe
1440  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1441  size_t t_idx = 0, s_idx = 0, idx = 0;
1442  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1443  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1444  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1445  const auto insert_from_temp_cache =
1446  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1447  if (insert_from_temp_cache) {
1448  updated_cache[idx] = temp_sorted_cache[t_idx++];
1449  } else {
1450  updated_cache[idx] = sorted_cache[s_idx++];
1451  }
1452  }
1453  while (t_idx < temp_sorted_cache.size()) {
1454  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1455  }
1456  while (s_idx < sorted_cache.size()) {
1457  updated_cache[idx++] = sorted_cache[s_idx++];
1458  }
1459  sorted_cache.swap(updated_cache);
1460 }
1461 
1463  std::vector<int32_t>& dest_ids,
1464  StringDictionary* dest_dict,
1465  const std::vector<int32_t>& source_ids,
1466  const StringDictionary* source_dict,
1467  const std::map<int32_t, std::string> transient_mapping) {
1468  std::vector<std::string> strings;
1469 
1470  for (const int32_t source_id : source_ids) {
1471  if (source_id == std::numeric_limits<int32_t>::min()) {
1472  strings.emplace_back("");
1473  } else if (source_id < 0) {
1474  if (auto string_itr = transient_mapping.find(source_id);
1475  string_itr != transient_mapping.end()) {
1476  strings.emplace_back(string_itr->second);
1477  } else {
1478  throw std::runtime_error("Unexpected negative source ID");
1479  }
1480  } else {
1481  strings.push_back(source_dict->getString(source_id));
1482  }
1483  }
1484 
1485  dest_ids.resize(strings.size());
1486  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1487 }
1488 
1490  std::vector<std::vector<int32_t>>& dest_array_ids,
1491  StringDictionary* dest_dict,
1492  const std::vector<std::vector<int32_t>>& source_array_ids,
1493  const StringDictionary* source_dict) {
1494  dest_array_ids.resize(source_array_ids.size());
1495 
1496  std::atomic<size_t> row_idx{0};
1497  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1498  int thread_id) {
1499  for (;;) {
1500  auto row = row_idx.fetch_add(1);
1501 
1502  if (row >= dest_array_ids.size()) {
1503  return;
1504  }
1505  const auto& source_ids = source_array_ids[row];
1506  auto& dest_ids = dest_array_ids[row];
1507  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1508  }
1509  };
1510 
1511  const int num_worker_threads = std::thread::hardware_concurrency();
1512 
1513  if (source_array_ids.size() / num_worker_threads > 10) {
1514  std::vector<std::future<void>> worker_threads;
1515  for (int i = 0; i < num_worker_threads; ++i) {
1516  worker_threads.push_back(std::async(std::launch::async, processor, i));
1517  }
1518 
1519  for (auto& child : worker_threads) {
1520  child.wait();
1521  }
1522  for (auto& child : worker_threads) {
1523  child.get();
1524  }
1525  } else {
1526  processor(0);
1527  }
1528 }
1529 
1530 void translate_string_ids(std::vector<int32_t>& dest_ids,
1531  const LeafHostInfo& dict_server_host,
1532  const DictRef dest_dict_ref,
1533  const std::vector<int32_t>& source_ids,
1534  const DictRef source_dict_ref,
1535  const int32_t dest_generation) {
1536  DictRef temp_dict_ref(-1, -1);
1537  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1538  string_client.translate_string_ids(
1539  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1540 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
void increaseHashTableCapacity() noexcept
int open(const char *path, int flags, int mode)
Definition: omnisci_fs.cpp:64
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:203
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:222
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
Constants for Builtin SQL Types supported by OmniSci.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:221
int32_t getIdOfString(const std::string &str) const
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
constexpr double a
Definition: Utm.h:38
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
void close(const int fd)
Definition: omnisci_fs.cpp:68
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_NE(x, y)
Definition: Logger.h:218
const std::string folder_
mapd_shared_mutex rw_mutex_
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
void appendToStorage(const String str) noexcept
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:219
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
#define CHECK_LE(x, y)
Definition: Logger.h:220
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
bool g_cache_string_hash
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
ThreadId thread_id()
Definition: Logger.cpp:791
mapd_shared_lock< mapd_shared_mutex > read_lock
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
Definition: Execute.cpp:76
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:209
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
int get_page_size()
Definition: omnisci_fs.cpp:27
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
void throw_encoding_error(std::string_view str, std::string_view folder)
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:24
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:41
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
#define UINT32_MAX
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:303
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::shared_ptr< const std::vector< std::string > > copyStrings() const
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31