OmniSciDB  2e3a973ef4
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <tbb/parallel_for.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 #include <boost/sort/spreadsort/string_sort.hpp>
23 #include <future>
24 #include <iostream>
25 #include <string_view>
26 #include <thread>
27 
28 // TODO(adb): fixup
29 #ifdef _MSC_VER
30 #include <fcntl.h>
31 #else
32 #include <sys/fcntl.h>
33 #endif
34 
35 #include "Logger/Logger.h"
36 #include "OSDependent/omnisci_fs.h"
37 #include "Shared/sqltypes.h"
38 #include "Shared/thread_count.h"
39 #include "StringDictionaryClient.h"
40 #include "Utils/Regexp.h"
41 #include "Utils/StringLike.h"
42 
43 #include "LeafHostInfo.h"
44 
46 
47 namespace {
48 
50 
51 int checked_open(const char* path, const bool recover) {
52  auto fd = omnisci::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
53  if (fd > 0) {
54  return fd;
55  }
56  auto err = std::string("Dictionary path ") + std::string(path) +
57  std::string(" does not exist.");
58  LOG(ERROR) << err;
59  throw DictPayloadUnavailable(err);
60 }
61 
62 const uint64_t round_up_p2(const uint64_t num) {
63  uint64_t in = num;
64  in--;
65  in |= in >> 1;
66  in |= in >> 2;
67  in |= in >> 4;
68  in |= in >> 8;
69  in |= in >> 16;
70  in++;
71  // TODO MAT deal with case where filesize has been increased but reality is
72  // we are constrained to 2^31.
73  // In that situation this calculation will wrap to zero
74  if (in == 0 || (in > (UINT32_MAX))) {
75  in = UINT32_MAX;
76  }
77  return in;
78 }
79 
80 uint32_t rk_hash(const std::string_view& str) {
81  uint32_t str_hash = 1;
82  // rely on fact that unsigned overflow is defined and wraps
83  for (size_t i = 0; i < str.size(); ++i) {
84  str_hash = str_hash * 997 + str[i];
85  }
86  return str_hash;
87 }
88 } // namespace
89 
91 constexpr int32_t StringDictionary::INVALID_STR_ID;
92 constexpr size_t StringDictionary::MAX_STRLEN;
93 constexpr size_t StringDictionary::MAX_STRCOUNT;
94 
95 StringDictionary::StringDictionary(const std::string& folder,
96  const bool isTemp,
97  const bool recover,
98  const bool materializeHashes,
99  size_t initial_capacity)
100  : str_count_(0)
101  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
102  , rk_hashes_(initial_capacity)
103  , isTemp_(isTemp)
104  , materialize_hashes_(materializeHashes)
105  , payload_fd_(-1)
106  , offset_fd_(-1)
107  , offset_map_(nullptr)
108  , payload_map_(nullptr)
109  , offset_file_size_(0)
110  , payload_file_size_(0)
111  , payload_file_off_(0)
112  , strings_cache_(nullptr) {
113  if (!isTemp && folder.empty()) {
114  return;
115  }
116 
117  // initial capacity must be a power of two for efficient bucket computation
118  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
119  if (!isTemp_) {
120  boost::filesystem::path storage_path(folder);
121  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
122  const auto payload_path =
123  (storage_path / boost::filesystem::path("DictPayload")).string();
124  payload_fd_ = checked_open(payload_path.c_str(), recover);
125  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
128  }
129  bool storage_is_empty = false;
130  if (payload_file_size_ == 0) {
131  storage_is_empty = true;
133  }
134  if (offset_file_size_ == 0) {
136  }
137  if (!isTemp_) { // we never mmap or recover temp dictionaries
138  payload_map_ =
139  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
140  offset_map_ = reinterpret_cast<StringIdxEntry*>(
142  if (recover) {
143  const size_t bytes = omnisci::file_size(offset_fd_);
144  if (bytes % sizeof(StringIdxEntry) != 0) {
145  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
146  }
147  const uint64_t str_count =
148  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
149  collisions_ = 0;
150  // at this point we know the size of the StringDict we need to load
151  // so lets reallocate the vector to the correct size
152  const uint64_t max_entries =
153  std::max(round_up_p2(str_count * 2 + 1),
154  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
155  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
156  string_id_hash_table_.swap(new_str_ids);
157  if (materialize_hashes_) {
158  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
159  rk_hashes_.swap(new_rk_hashes);
160  }
161  // Bail early if we know we don't have strings to add (i.e. a new or empty
162  // dictionary)
163  if (str_count == 0) {
164  return;
165  }
166 
167  unsigned string_id = 0;
168  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
169 
170  uint32_t thread_inits = 0;
171  const auto thread_count = std::thread::hardware_concurrency();
172  const uint32_t items_per_thread = std::max<uint32_t>(
173  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
174  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
175  dictionary_futures;
176  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
177  dictionary_futures.emplace_back(std::async(
178  std::launch::async, [string_id, str_count, items_per_thread, this] {
179  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
180  for (uint32_t curr_id = string_id;
181  curr_id < string_id + items_per_thread && curr_id < str_count;
182  curr_id++) {
183  const auto recovered = getStringFromStorage(curr_id);
184  if (recovered.canary) {
185  // hit the canary, recovery finished
186  break;
187  } else {
188  std::string temp(recovered.c_str_ptr, recovered.size);
189  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
190  }
191  }
192  return hashVec;
193  }));
194  thread_inits++;
195  if (thread_inits % thread_count == 0) {
196  processDictionaryFutures(dictionary_futures);
197  }
198  }
199  // gather last few threads
200  if (dictionary_futures.size() != 0) {
201  processDictionaryFutures(dictionary_futures);
202  }
203  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
204  << " Hash table size: " << string_id_hash_table_.size() << " Fill rate: "
205  << static_cast<double>(str_count_) * 100.0 / string_id_hash_table_.size()
206  << "% Collisions: " << collisions_;
207  }
208  }
209 }
210 
212  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>&
213  dictionary_futures) {
214  for (auto& dictionary_future : dictionary_futures) {
215  dictionary_future.wait();
216  auto hashVec = dictionary_future.get();
217  for (auto& hash : hashVec) {
218  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
219  payload_file_off_ += hash.second;
220  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
221  if (materialize_hashes_) {
222  rk_hashes_[str_count_] = hash.first;
223  }
224  ++str_count_;
225  }
226  }
227  dictionary_futures.clear();
228 }
229 
237 size_t StringDictionary::getNumStringsFromStorage(const size_t storage_slots) const
238  noexcept {
239  if (storage_slots == 0) {
240  return 0;
241  }
242  // Must use signed integers since final binary search step can wrap to max size_t value
243  // if dictionary is empty
244  int64_t min_bound = 0;
245  int64_t max_bound = storage_slots - 1;
246  int64_t guess{0};
247  while (min_bound <= max_bound) {
248  guess = (max_bound + min_bound) / 2;
249  CHECK_GE(guess, 0);
250  if (getStringFromStorage(guess).canary) {
251  max_bound = guess - 1;
252  } else {
253  min_bound = guess + 1;
254  }
255  }
256  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
257  return guess + (min_bound > guess ? 1 : 0);
258 }
259 
261  : strings_cache_(nullptr)
262  , client_(new StringDictionaryClient(host, dict_ref, true))
263  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
264 
266  free(CANARY_BUFFER);
267  if (client_) {
268  return;
269  }
270  if (payload_map_) {
271  if (!isTemp_) {
275  CHECK_GE(payload_fd_, 0);
277  CHECK_GE(offset_fd_, 0);
279  } else {
281  free(payload_map_);
282  free(offset_map_);
283  }
284  }
285 }
286 
287 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
288  if (client_) {
289  std::vector<int32_t> string_ids;
290  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
291  CHECK_EQ(size_t(1), string_ids.size());
292  return string_ids.front();
293  }
294  return getOrAddImpl(str);
295 }
296 
297 namespace {
298 
299 template <class T>
300 void log_encoding_error(std::string_view str) {
301  LOG(ERROR) << "Could not encode string: " << str
302  << ", the encoded value doesn't fit in " << sizeof(T) * 8
303  << " bits. Will store NULL instead.";
304 }
305 
306 } // namespace
307 
308 template <class String>
310  const std::vector<std::vector<String>>& string_array_vec,
311  std::vector<std::vector<int32_t>>& ids_array_vec) {
312  ids_array_vec.resize(string_array_vec.size());
313  for (size_t i = 0; i < string_array_vec.size(); i++) {
314  auto& strings = string_array_vec[i];
315  auto& ids = ids_array_vec[i];
316  ids.resize(strings.size());
317  getOrAddBulk(strings, &ids[0]);
318  }
319 }
320 
322  const std::vector<std::vector<std::string>>& string_array_vec,
323  std::vector<std::vector<int32_t>>& ids_array_vec);
324 
330 template <class String>
331 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
332  std::vector<uint32_t>& hashes) const noexcept {
333  CHECK_EQ(string_vec.size(), hashes.size());
334 
335  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
336  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
337  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
338  if (string_vec[curr_id].empty()) {
339  continue;
340  }
341  hashes[curr_id] = rk_hash(string_vec[curr_id]);
342  }
343  });
344 }
345 
346 template <class T, class String>
347 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
348  T* output_string_ids) {
350  getOrAddBulkParallel(input_strings, output_string_ids);
351  return;
352  }
353  // Single-thread path.
354  if (client_no_timeout_) {
355  getOrAddBulkRemote(input_strings, output_string_ids);
356  return;
357  }
358  size_t out_idx{0};
359  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
360 
361  for (const auto& str : input_strings) {
362  if (str.empty()) {
363  output_string_ids[out_idx++] = inline_int_null_value<T>();
364  continue;
365  }
366  CHECK(str.size() <= MAX_STRLEN);
367  uint32_t bucket;
368  const uint32_t hash = rk_hash(str);
369  bucket = computeBucket(hash, str, string_id_hash_table_);
370  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
371  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
372  continue;
373  }
374  // need to add record to dictionary
375  // check there is room
376  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
377  log_encoding_error<T>(str);
378  output_string_ids[out_idx++] = inline_int_null_value<T>();
379  continue;
380  }
381  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
383  << "Maximum number (" << str_count_
384  << ") of Dictionary encoded Strings reached for this column, offset path "
385  "for column is "
386  << offsets_path_;
387  if (fillRateIsHigh(str_count_)) {
388  // resize when more than 50% is full
390  bucket = computeBucket(hash, str, string_id_hash_table_);
391  }
392  appendToStorage(str);
393 
394  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
395  if (materialize_hashes_) {
396  rk_hashes_[str_count_] = hash;
397  }
398  ++str_count_;
399  }
400  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
401  }
403 }
404 
405 template <class T, class String>
406 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
407  T* output_string_ids) {
408  if (client_no_timeout_) {
409  getOrAddBulkRemote(input_strings, output_string_ids);
410  return;
411  }
412  // Run rk_hash on the input strings up front, and in parallel,
413  // as the string hashing does not need to be behind the subsequent write_lock
414  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
415  hashStrings(input_strings, input_strings_rk_hashes);
416 
417  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
418  size_t shadow_str_count =
419  str_count_; // Need to shadow str_count_ now with bulk add methods
420  const size_t storage_high_water_mark = shadow_str_count;
421  std::vector<size_t> string_memory_ids;
422  size_t sum_new_string_lengths = 0;
423  string_memory_ids.reserve(input_strings.size());
424  size_t input_string_idx{0};
425  for (const auto& input_string : input_strings) {
426  // Currently we make empty strings null
427  if (input_string.empty()) {
428  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
429  continue;
430  }
431  // TODO: Recover gracefully if an input string is too long
432  CHECK(input_string.size() <= MAX_STRLEN);
433 
434  if (fillRateIsHigh(shadow_str_count)) {
435  // resize when more than 50% is full
436  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
437  input_strings,
438  string_memory_ids,
439  input_strings_rk_hashes);
440  }
441  // Get the rk_hash for this input_string
442  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
443 
444  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
445  input_string,
447  storage_high_water_mark,
448  input_strings,
449  string_memory_ids);
450 
451  // If the hash bucket is not empty, that is our string id
452  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
453  // bucket string are equal)
454  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
455  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
456  continue;
457  }
458  // Did not find string, so need to add record to dictionary
459  // First check there is room
460  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
461  log_encoding_error<T>(input_string);
462  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
463  continue;
464  }
465  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
466  << "Maximum number (" << shadow_str_count
467  << ") of Dictionary encoded Strings reached for this column, offset path "
468  "for column is "
469  << offsets_path_;
470 
471  string_memory_ids.push_back(input_string_idx);
472  sum_new_string_lengths += input_string.size();
473  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
474  if (materialize_hashes_) {
475  rk_hashes_[shadow_str_count] = input_string_rk_hash;
476  }
477  output_string_ids[input_string_idx++] = shadow_str_count++;
478  }
479  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
480  str_count_ = shadow_str_count;
481 
483 }
484 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
485  uint8_t* encoded_vec);
486 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
487  uint16_t* encoded_vec);
488 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
489  int32_t* encoded_vec);
490 
491 template void StringDictionary::getOrAddBulk(
492  const std::vector<std::string_view>& string_vec,
493  uint8_t* encoded_vec);
494 template void StringDictionary::getOrAddBulk(
495  const std::vector<std::string_view>& string_vec,
496  uint16_t* encoded_vec);
497 template void StringDictionary::getOrAddBulk(
498  const std::vector<std::string_view>& string_vec,
499  int32_t* encoded_vec);
500 
501 template <class T, class String>
502 void StringDictionary::getOrAddBulkRemote(const std::vector<String>& string_vec,
503  T* encoded_vec) {
505  std::vector<int32_t> string_ids;
506  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
507  size_t out_idx{0};
508  for (size_t i = 0; i < string_ids.size(); ++i) {
509  const auto string_id = string_ids[i];
510  const bool invalid = string_id > max_valid_int_value<T>();
511  if (invalid || string_id == inline_int_null_value<int32_t>()) {
512  if (invalid) {
513  log_encoding_error<T>(string_vec[i]);
514  }
515  encoded_vec[out_idx++] = inline_int_null_value<T>();
516  continue;
517  }
518  encoded_vec[out_idx++] = string_id;
519  }
520 }
521 
523  const std::vector<std::string>& string_vec,
524  uint8_t* encoded_vec);
526  const std::vector<std::string>& string_vec,
527  uint16_t* encoded_vec);
529  const std::vector<std::string>& string_vec,
530  int32_t* encoded_vec);
531 
533  const std::vector<std::string_view>& string_vec,
534  uint8_t* encoded_vec);
536  const std::vector<std::string_view>& string_vec,
537  uint16_t* encoded_vec);
539  const std::vector<std::string_view>& string_vec,
540  int32_t* encoded_vec);
541 
542 int32_t StringDictionary::getIdOfString(const std::string& str) const {
543  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
544  if (client_) {
545  return client_->get(str);
546  }
547  return getUnlocked(str);
548 }
549 
550 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
551  const uint32_t hash = rk_hash(str);
552  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
553  return str_id;
554 }
555 
556 std::string StringDictionary::getString(int32_t string_id) const {
557  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
558  if (client_) {
559  std::string ret;
560  client_->get_string(ret, string_id);
561  return ret;
562  }
563  return getStringUnlocked(string_id);
564 }
565 
566 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
567  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
568  return getStringChecked(string_id);
569 }
570 
571 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
572  noexcept {
573  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
574  CHECK(!client_);
575  CHECK_LE(0, string_id);
576  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
577  return getStringBytesChecked(string_id);
578 }
579 
581  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
582  if (client_) {
583  return client_->storage_entry_count();
584  }
585  return str_count_;
586 }
587 
588 namespace {
589 
590 bool is_like(const std::string& str,
591  const std::string& pattern,
592  const bool icase,
593  const bool is_simple,
594  const char escape) {
595  return icase
596  ? (is_simple ? string_ilike_simple(
597  str.c_str(), str.size(), pattern.c_str(), pattern.size())
598  : string_ilike(str.c_str(),
599  str.size(),
600  pattern.c_str(),
601  pattern.size(),
602  escape))
603  : (is_simple ? string_like_simple(
604  str.c_str(), str.size(), pattern.c_str(), pattern.size())
605  : string_like(str.c_str(),
606  str.size(),
607  pattern.c_str(),
608  pattern.size(),
609  escape));
610 }
611 
612 } // namespace
613 
614 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
615  const bool icase,
616  const bool is_simple,
617  const char escape,
618  const size_t generation) const {
619  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
620  if (client_) {
621  return client_->get_like(pattern, icase, is_simple, escape, generation);
622  }
623  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
624  const auto it = like_cache_.find(cache_key);
625  if (it != like_cache_.end()) {
626  return it->second;
627  }
628  std::vector<int32_t> result;
629  std::vector<std::thread> workers;
630  int worker_count = cpu_threads();
631  CHECK_GT(worker_count, 0);
632  std::vector<std::vector<int32_t>> worker_results(worker_count);
633  CHECK_LE(generation, str_count_);
634  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
635  workers.emplace_back([&worker_results,
636  &pattern,
637  generation,
638  icase,
639  is_simple,
640  escape,
641  worker_idx,
642  worker_count,
643  this]() {
644  for (size_t string_id = worker_idx; string_id < generation;
645  string_id += worker_count) {
646  const auto str = getStringUnlocked(string_id);
647  if (is_like(str, pattern, icase, is_simple, escape)) {
648  worker_results[worker_idx].push_back(string_id);
649  }
650  }
651  });
652  }
653  for (auto& worker : workers) {
654  worker.join();
655  }
656  for (const auto& worker_result : worker_results) {
657  result.insert(result.end(), worker_result.begin(), worker_result.end());
658  }
659  // place result into cache for reuse if similar query
660  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
661 
662  CHECK(it_ok.second);
663 
664  return result;
665 }
666 
667 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
668  std::string comp_operator,
669  size_t generation) {
670  std::vector<int32_t> result;
671  auto eq_id_itr = equal_cache_.find(pattern);
672  int32_t eq_id = MAX_STRLEN + 1;
673  int32_t cur_size = str_count_;
674  if (eq_id_itr != equal_cache_.end()) {
675  auto eq_id = eq_id_itr->second;
676  if (comp_operator == "=") {
677  result.push_back(eq_id);
678  } else {
679  for (int32_t idx = 0; idx <= cur_size; idx++) {
680  if (idx == eq_id) {
681  continue;
682  }
683  result.push_back(idx);
684  }
685  }
686  } else {
687  std::vector<std::thread> workers;
688  int worker_count = cpu_threads();
689  CHECK_GT(worker_count, 0);
690  std::vector<std::vector<int32_t>> worker_results(worker_count);
691  CHECK_LE(generation, str_count_);
692  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
693  workers.emplace_back(
694  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
695  for (size_t string_id = worker_idx; string_id < generation;
696  string_id += worker_count) {
697  const auto str = getStringUnlocked(string_id);
698  if (str == pattern) {
699  worker_results[worker_idx].push_back(string_id);
700  }
701  }
702  });
703  }
704  for (auto& worker : workers) {
705  worker.join();
706  }
707  for (const auto& worker_result : worker_results) {
708  result.insert(result.end(), worker_result.begin(), worker_result.end());
709  }
710  if (result.size() > 0) {
711  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
712  CHECK(it_ok.second);
713  eq_id = result[0];
714  }
715  if (comp_operator == "<>") {
716  for (int32_t idx = 0; idx <= cur_size; idx++) {
717  if (idx == eq_id) {
718  continue;
719  }
720  result.push_back(idx);
721  }
722  }
723  }
724  return result;
725 }
726 
727 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
728  const std::string& comp_operator,
729  const size_t generation) {
730  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
731  if (client_) {
732  return client_->get_compare(pattern, comp_operator, generation);
733  }
734  std::vector<int32_t> ret;
735  if (str_count_ == 0) {
736  return ret;
737  }
738  if (sorted_cache.size() < str_count_) {
739  if (comp_operator == "=" || comp_operator == "<>") {
740  return getEquals(pattern, comp_operator, generation);
741  }
742 
744  }
745  auto cache_index = compare_cache_.get(pattern);
746 
747  if (!cache_index) {
748  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
749  const auto cache_itr = std::lower_bound(
750  sorted_cache.begin(),
751  sorted_cache.end(),
752  pattern,
753  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
754  auto a_str = this->getStringFromStorage(a);
755  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
756  });
757 
758  if (cache_itr == sorted_cache.end()) {
759  cache_index->index = sorted_cache.size() - 1;
760  cache_index->diff = 1;
761  } else {
762  const auto cache_str = getStringFromStorage(*cache_itr);
763  if (!string_eq(
764  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
765  cache_index->index = cache_itr - sorted_cache.begin() - 1;
766  cache_index->diff = 1;
767  } else {
768  cache_index->index = cache_itr - sorted_cache.begin();
769  cache_index->diff = 0;
770  }
771  }
772 
773  compare_cache_.put(pattern, cache_index);
774  }
775 
776  // since we have a cache in form of vector of ints which is sorted according to
777  // corresponding strings in the dictionary all we need is the index of the element
778  // which equal to the pattern that we are trying to match or the index of “biggest”
779  // element smaller than the pattern, to perform all the comparison operators over
780  // string. The search function guarantees we have such index so now it is just the
781  // matter to include all the elements in the result vector.
782 
783  // For < operator if the index that we have points to the element which is equal to
784  // the pattern that we are searching for we simply get all the elements less than the
785  // index. If the element pointed by the index is not equal to the pattern we are
786  // comparing with we also need to include that index in result vector, except when the
787  // index points to 0 and the pattern is lesser than the smallest value in the string
788  // dictionary.
789 
790  if (comp_operator == "<") {
791  size_t idx = cache_index->index;
792  if (cache_index->diff) {
793  idx = cache_index->index + 1;
794  if (cache_index->index == 0 && cache_index->diff > 0) {
795  idx = cache_index->index;
796  }
797  }
798  for (size_t i = 0; i < idx; i++) {
799  ret.push_back(sorted_cache[i]);
800  }
801 
802  // For <= operator if the index that we have points to the element which is equal to
803  // the pattern that we are searching for we want to include the element pointed by
804  // the index in the result set. If the element pointed by the index is not equal to
805  // the pattern we are comparing with we just want to include all the ids with index
806  // less than the index that is cached, except when pattern that we are searching for
807  // is smaller than the smallest string in the dictionary.
808 
809  } else if (comp_operator == "<=") {
810  size_t idx = cache_index->index + 1;
811  if (cache_index == 0 && cache_index->diff > 0) {
812  idx = cache_index->index;
813  }
814  for (size_t i = 0; i < idx; i++) {
815  ret.push_back(sorted_cache[i]);
816  }
817 
818  // For > operator we want to get all the elements with index greater than the index
819  // that we have except, when the pattern we are searching for is lesser than the
820  // smallest string in the dictionary we also want to include the id of the index
821  // that we have.
822 
823  } else if (comp_operator == ">") {
824  size_t idx = cache_index->index + 1;
825  if (cache_index->index == 0 && cache_index->diff > 0) {
826  idx = cache_index->index;
827  }
828  for (size_t i = idx; i < sorted_cache.size(); i++) {
829  ret.push_back(sorted_cache[i]);
830  }
831 
832  // For >= operator when the indexed element that we have points to element which is
833  // equal to the pattern we are searching for we want to include that in the result
834  // vector. If the index that we have does not point to the string which is equal to
835  // the pattern we are searching we don’t want to include that id into the result
836  // vector except when the index is 0.
837 
838  } else if (comp_operator == ">=") {
839  size_t idx = cache_index->index;
840  if (cache_index->diff) {
841  idx = cache_index->index + 1;
842  if (cache_index->index == 0 && cache_index->diff > 0) {
843  idx = cache_index->index;
844  }
845  }
846  for (size_t i = idx; i < sorted_cache.size(); i++) {
847  ret.push_back(sorted_cache[i]);
848  }
849  } else if (comp_operator == "=") {
850  if (!cache_index->diff) {
851  ret.push_back(sorted_cache[cache_index->index]);
852  }
853 
854  // For <> operator it is simple matter of not including id of string which is equal
855  // to pattern we are searching for.
856  } else if (comp_operator == "<>") {
857  if (!cache_index->diff) {
858  size_t idx = cache_index->index;
859  for (size_t i = 0; i < idx; i++) {
860  ret.push_back(sorted_cache[i]);
861  }
862  ++idx;
863  for (size_t i = idx; i < sorted_cache.size(); i++) {
864  ret.push_back(sorted_cache[i]);
865  }
866  } else {
867  for (size_t i = 0; i < sorted_cache.size(); i++) {
868  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
869  }
870  }
871 
872  } else {
873  std::runtime_error("Unsupported string comparison operator");
874  }
875  return ret;
876 }
877 
878 namespace {
879 
880 bool is_regexp_like(const std::string& str,
881  const std::string& pattern,
882  const char escape) {
883  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
884 }
885 
886 } // namespace
887 
888 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
889  const char escape,
890  const size_t generation) const {
891  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
892  if (client_) {
893  return client_->get_regexp_like(pattern, escape, generation);
894  }
895  const auto cache_key = std::make_pair(pattern, escape);
896  const auto it = regex_cache_.find(cache_key);
897  if (it != regex_cache_.end()) {
898  return it->second;
899  }
900  std::vector<int32_t> result;
901  std::vector<std::thread> workers;
902  int worker_count = cpu_threads();
903  CHECK_GT(worker_count, 0);
904  std::vector<std::vector<int32_t>> worker_results(worker_count);
905  CHECK_LE(generation, str_count_);
906  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
907  workers.emplace_back([&worker_results,
908  &pattern,
909  generation,
910  escape,
911  worker_idx,
912  worker_count,
913  this]() {
914  for (size_t string_id = worker_idx; string_id < generation;
915  string_id += worker_count) {
916  const auto str = getStringUnlocked(string_id);
917  if (is_regexp_like(str, pattern, escape)) {
918  worker_results[worker_idx].push_back(string_id);
919  }
920  }
921  });
922  }
923  for (auto& worker : workers) {
924  worker.join();
925  }
926  for (const auto& worker_result : worker_results) {
927  result.insert(result.end(), worker_result.begin(), worker_result.end());
928  }
929  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
930  CHECK(it_ok.second);
931 
932  return result;
933 }
934 
935 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
936  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
937  if (client_) {
938  // TODO(miyu): support remote string dictionary
939  throw std::runtime_error(
940  "copying dictionaries from remote server is not supported yet.");
941  }
942 
943  if (strings_cache_) {
944  return strings_cache_;
945  }
946 
947  strings_cache_ = std::make_shared<std::vector<std::string>>();
948  strings_cache_->reserve(str_count_);
949  const bool multithreaded = str_count_ > 10000;
950  const auto worker_count =
951  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
952  CHECK_GT(worker_count, 0UL);
953  std::vector<std::vector<std::string>> worker_results(worker_count);
954  auto copy = [this](std::vector<std::string>& str_list,
955  const size_t start_id,
956  const size_t end_id) {
957  CHECK_LE(start_id, end_id);
958  str_list.reserve(end_id - start_id);
959  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
960  str_list.push_back(getStringUnlocked(string_id));
961  }
962  };
963  if (multithreaded) {
964  std::vector<std::future<void>> workers;
965  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
966  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
967  worker_idx < worker_count && start < str_count_;
968  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
969  workers.push_back(std::async(
970  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
971  }
972  for (auto& worker : workers) {
973  worker.get();
974  }
975  } else {
976  CHECK_EQ(worker_results.size(), size_t(1));
977  copy(worker_results[0], 0, str_count_);
978  }
979 
980  for (const auto& worker_result : worker_results) {
981  strings_cache_->insert(
982  strings_cache_->end(), worker_result.begin(), worker_result.end());
983  }
984  return strings_cache_;
985 }
986 
987 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
988  return string_id_hash_table_.size() <= num_strings * 2;
989 }
990 
992  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
993 
994  if (materialize_hashes_) {
995  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
997  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
998  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
999  new_str_ids[bucket] = string_id_hash_table_[i];
1000  }
1001  }
1002  rk_hashes_.resize(rk_hashes_.size() * 2);
1003  } else {
1004  for (size_t i = 0; i < str_count_; ++i) {
1005  const auto str = getStringChecked(i);
1006  const uint32_t hash = rk_hash(str);
1007  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1008  new_str_ids[bucket] = i;
1009  }
1010  }
1011  string_id_hash_table_.swap(new_str_ids);
1012 }
1013 
1014 template <class String>
1016  const size_t storage_high_water_mark,
1017  const std::vector<String>& input_strings,
1018  const std::vector<size_t>& string_memory_ids,
1019  const std::vector<uint32_t>& input_strings_rk_hashes) noexcept {
1020  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1021  if (materialize_hashes_) {
1022  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1024  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1025  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1026  new_str_ids[bucket] = string_id_hash_table_[i];
1027  }
1028  }
1029  rk_hashes_.resize(rk_hashes_.size() * 2);
1030  } else {
1031  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1032  const auto storage_string = getStringChecked(storage_idx);
1033  const uint32_t hash = rk_hash(storage_string);
1034  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1035  new_str_ids[bucket] = storage_idx;
1036  }
1037  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1038  size_t string_memory_id = string_memory_ids[memory_idx];
1039  uint32_t bucket = computeUniqueBucketWithHash(
1040  input_strings_rk_hashes[string_memory_id], new_str_ids);
1041  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1042  }
1043  }
1044  string_id_hash_table_.swap(new_str_ids);
1045 }
1046 
1047 int32_t StringDictionary::getOrAddImpl(const std::string& str) noexcept {
1048  // @TODO(wei) treat empty string as NULL for now
1049  if (str.size() == 0) {
1050  return inline_int_null_value<int32_t>();
1051  }
1052  CHECK(str.size() <= MAX_STRLEN);
1053  uint32_t bucket;
1054  const uint32_t hash = rk_hash(str);
1055  {
1056  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1057  bucket = computeBucket(hash, str, string_id_hash_table_);
1058  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1059  return string_id_hash_table_[bucket];
1060  }
1061  }
1062  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1063  // need to recalculate the bucket in case it changed before
1064  // we got the lock
1065  bucket = computeBucket(hash, str, string_id_hash_table_);
1066  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1068  << "Maximum number (" << str_count_
1069  << ") of Dictionary encoded Strings reached for this column, offset path "
1070  "for column is "
1071  << offsets_path_;
1072  if (fillRateIsHigh(str_count_)) {
1073  // resize when more than 50% is full
1074  increaseCapacity();
1075  bucket = computeBucket(hash, str, string_id_hash_table_);
1076  }
1077  appendToStorage(str);
1078  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1079  if (materialize_hashes_) {
1080  rk_hashes_[str_count_] = hash;
1081  }
1082  ++str_count_;
1084  }
1085  return string_id_hash_table_[bucket];
1086 }
1087 
1088 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1089  const auto str_canary = getStringFromStorage(string_id);
1090  CHECK(!str_canary.canary);
1091  return std::string(str_canary.c_str_ptr, str_canary.size);
1092 }
1093 
1095  const int string_id) const noexcept {
1096  const auto str_canary = getStringFromStorage(string_id);
1097  CHECK(!str_canary.canary);
1098  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1099 }
1100 
1101 template <class String>
1102 uint32_t StringDictionary::computeBucket(const uint32_t hash,
1103  const String& str,
1104  const std::vector<int32_t>& data) const
1105  noexcept {
1106  auto bucket = hash & (data.size() - 1);
1107  while (true) {
1108  const int32_t candidate_string_id = data[bucket];
1109  if (candidate_string_id ==
1110  INVALID_STR_ID) { // In this case it means the slot is available for use
1111  break;
1112  }
1113  if (!materialize_hashes_ ||
1114  (materialize_hashes_ && hash == rk_hashes_[candidate_string_id])) {
1115  const auto old_str = getStringFromStorageFast(candidate_string_id);
1116  if (str.size() == old_str.size() &&
1117  !memcmp(str.data(), old_str.data(), str.size())) {
1118  // found the string
1119  break;
1120  }
1121  }
1122  // wrap around
1123  if (++bucket == data.size()) {
1124  bucket = 0;
1125  }
1126  }
1127  return bucket;
1128 }
1129 
1130 template <class String>
1132  const uint32_t input_string_rk_hash,
1133  const String& input_string,
1134  const std::vector<int32_t>& string_id_hash_table,
1135  const size_t storage_high_water_mark,
1136  const std::vector<String>& input_strings,
1137  const std::vector<size_t>& string_memory_ids) const noexcept {
1138  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1139  while (true) {
1140  const int32_t candidate_string_id = string_id_hash_table[bucket];
1141  if (candidate_string_id ==
1142  INVALID_STR_ID) { // In this case it means the slot is available for use
1143  break;
1144  }
1145  if (!materialize_hashes_ ||
1146  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1147  if (candidate_string_id > 0 &&
1148  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1149  // The candidate string is not in storage yet but in our string_memory_ids temp
1150  // buffer
1151  size_t memory_offset =
1152  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1153  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1154  if (input_string.size() == candidate_string.size() &&
1155  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1156  // found the string in the temp memory buffer
1157  break;
1158  }
1159  } else {
1160  // The candidate string is in storage, need to fetch it for comparison
1161  const auto candidate_storage_string =
1162  getStringFromStorageFast(candidate_string_id);
1163  if (input_string.size() == candidate_storage_string.size() &&
1164  !memcmp(input_string.data(),
1165  candidate_storage_string.data(),
1166  input_string.size())) {
1169  // found the string in storage
1170  break;
1171  }
1172  }
1173  }
1174  if (++bucket == string_id_hash_table.size()) {
1175  bucket = 0;
1176  }
1177  }
1178  return bucket;
1179 }
1180 
1182  const uint32_t hash,
1183  const std::vector<int32_t>& data) noexcept {
1184  auto bucket = hash & (data.size() - 1);
1185  while (true) {
1186  if (data[bucket] ==
1187  INVALID_STR_ID) { // In this case it means the slot is available for use
1188  break;
1189  }
1190  collisions_++;
1191  // wrap around
1192  if (++bucket == data.size()) {
1193  bucket = 0;
1194  }
1195  }
1196  return bucket;
1197 }
1198 
1200  const size_t write_length) {
1201  if (payload_file_off_ + write_length > payload_file_size_) {
1202  const size_t min_capacity_needed =
1203  write_length - (payload_file_size_ - payload_file_off_);
1204  if (!isTemp_) {
1205  CHECK_GE(payload_fd_, 0);
1207  addPayloadCapacity(min_capacity_needed);
1208  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1209  payload_map_ =
1210  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1211  } else {
1212  addPayloadCapacity(min_capacity_needed);
1213  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1214  }
1215  }
1216 }
1217 
1219  const size_t write_length) {
1220  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1221  if (offset_file_off + write_length >= offset_file_size_) {
1222  const size_t min_capacity_needed =
1223  write_length - (offset_file_size_ - offset_file_off);
1224  if (!isTemp_) {
1225  CHECK_GE(offset_fd_, 0);
1227  addOffsetCapacity(min_capacity_needed);
1228  CHECK(offset_file_off + write_length <= offset_file_size_);
1229  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1231  } else {
1232  addOffsetCapacity(min_capacity_needed);
1233  CHECK(offset_file_off + write_length <= offset_file_size_);
1234  }
1235  }
1236 }
1237 
1238 template <class String>
1239 void StringDictionary::appendToStorage(String str) noexcept {
1240  // write the payload
1242  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1243 
1244  // write the offset and length
1245  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1246  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1247 
1249  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1250 }
1251 
1252 template <class String>
1254  const std::vector<String>& input_strings,
1255  const std::vector<size_t>& string_memory_ids,
1256  const size_t sum_new_strings_lengths) noexcept {
1257  const size_t num_strings = string_memory_ids.size();
1258 
1259  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1261 
1262  for (size_t i = 0; i < num_strings; ++i) {
1263  const size_t string_idx = string_memory_ids[i];
1264  const String str = input_strings[string_idx];
1265  const size_t str_size(str.size());
1266  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1267  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1268  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1269  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1270  }
1271 }
1272 
1273 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1274  noexcept {
1275  const StringIdxEntry* str_meta = offset_map_ + string_id;
1276  return {payload_map_ + str_meta->off, str_meta->size};
1277 }
1278 
1280  const int string_id) const noexcept {
1281  if (!isTemp_) {
1282  CHECK_GE(payload_fd_, 0);
1283  CHECK_GE(offset_fd_, 0);
1284  }
1285  CHECK_GE(string_id, 0);
1286  const StringIdxEntry* str_meta = offset_map_ + string_id;
1287  if (str_meta->size == 0xffff) {
1288  // hit the canary
1289  return {nullptr, 0, true};
1290  }
1291  return {payload_map_ + str_meta->off, str_meta->size, false};
1292 }
1293 
1294 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1295  if (!isTemp_) {
1296  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1297  } else {
1298  payload_map_ = static_cast<char*>(
1299  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1300  }
1301 }
1302 
1303 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1304  if (!isTemp_) {
1305  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1306  } else {
1307  offset_map_ = static_cast<StringIdxEntry*>(
1308  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1309  }
1310 }
1311 
1313  int fd,
1314  const size_t min_capacity_requested) noexcept {
1315  const size_t canary_buff_size_to_add =
1316  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1317  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1318 
1319  if (canary_buffer_size != canary_buff_size_to_add) {
1320  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1321  canary_buffer_size = canary_buff_size_to_add;
1322  }
1324  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1325 
1326  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1327  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1328  CHECK(write_return > 0 &&
1329  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1330  return canary_buff_size_to_add;
1331 }
1332 
1334  size_t& mem_size,
1335  const size_t min_capacity_requested) noexcept {
1336  const size_t canary_buff_size_to_add =
1337  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1338  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1339  if (canary_buffer_size != canary_buff_size_to_add) {
1340  CANARY_BUFFER =
1341  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1342  canary_buffer_size = canary_buff_size_to_add;
1343  }
1345  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1346  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1347  CHECK(new_addr);
1348  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1349  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1350  mem_size += canary_buff_size_to_add;
1351  return new_addr;
1352 }
1353 
1355  if (!like_cache_.empty()) {
1356  decltype(like_cache_)().swap(like_cache_);
1357  }
1358  if (!regex_cache_.empty()) {
1359  decltype(regex_cache_)().swap(regex_cache_);
1360  }
1361  if (!equal_cache_.empty()) {
1362  decltype(equal_cache_)().swap(equal_cache_);
1363  }
1364  compare_cache_.invalidateInvertedIndex();
1365 }
1366 
1368  if (client_) {
1369  try {
1370  return client_->checkpoint();
1371  } catch (...) {
1372  return false;
1373  }
1374  }
1375  CHECK(!isTemp_);
1376  bool ret = true;
1377  ret = ret &&
1378  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1379  ret = ret &&
1380  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1381  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1382  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1383  return ret;
1384 }
1385 
1387  // This method is not thread-safe.
1388  const auto cur_cache_size = sorted_cache.size();
1389  std::vector<int32_t> temp_sorted_cache;
1390  for (size_t i = cur_cache_size; i < str_count_; i++) {
1391  temp_sorted_cache.push_back(i);
1392  }
1393  sortCache(temp_sorted_cache);
1394  mergeSortedCache(temp_sorted_cache);
1395 }
1396 
1397 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1398  // This method is not thread-safe.
1399 
1400  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1401  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1402 
1403  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1404  auto a_str = this->getStringFromStorage(a);
1405  auto b_str = this->getStringFromStorage(b);
1406  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1407  });
1408 }
1409 
1410 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1411  // this method is not thread safe
1412  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1413  size_t t_idx = 0, s_idx = 0, idx = 0;
1414  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1415  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1416  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1417  const auto insert_from_temp_cache =
1418  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1419  if (insert_from_temp_cache) {
1420  updated_cache[idx] = temp_sorted_cache[t_idx++];
1421  } else {
1422  updated_cache[idx] = sorted_cache[s_idx++];
1423  }
1424  }
1425  while (t_idx < temp_sorted_cache.size()) {
1426  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1427  }
1428  while (s_idx < sorted_cache.size()) {
1429  updated_cache[idx++] = sorted_cache[s_idx++];
1430  }
1431  sorted_cache.swap(updated_cache);
1432 }
1433 
1435  std::vector<int32_t>& dest_ids,
1436  StringDictionary* dest_dict,
1437  const std::vector<int32_t>& source_ids,
1438  const StringDictionary* source_dict,
1439  const std::map<int32_t, std::string> transient_mapping) {
1440  std::vector<std::string> strings;
1441 
1442  for (const int32_t source_id : source_ids) {
1443  if (source_id == std::numeric_limits<int32_t>::min()) {
1444  strings.emplace_back("");
1445  } else if (source_id < 0) {
1446  if (auto string_itr = transient_mapping.find(source_id);
1447  string_itr != transient_mapping.end()) {
1448  strings.emplace_back(string_itr->second);
1449  } else {
1450  throw std::runtime_error("Unexpected negative source ID");
1451  }
1452  } else {
1453  strings.push_back(source_dict->getString(source_id));
1454  }
1455  }
1456 
1457  dest_ids.resize(strings.size());
1458  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1459 }
1460 
1462  std::vector<std::vector<int32_t>>& dest_array_ids,
1463  StringDictionary* dest_dict,
1464  const std::vector<std::vector<int32_t>>& source_array_ids,
1465  const StringDictionary* source_dict) {
1466  dest_array_ids.resize(source_array_ids.size());
1467 
1468  std::atomic<size_t> row_idx{0};
1469  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1470  int thread_id) {
1471  for (;;) {
1472  auto row = row_idx.fetch_add(1);
1473 
1474  if (row >= dest_array_ids.size()) {
1475  return;
1476  }
1477  const auto& source_ids = source_array_ids[row];
1478  auto& dest_ids = dest_array_ids[row];
1479  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1480  }
1481  };
1482 
1483  const int num_worker_threads = std::thread::hardware_concurrency();
1484 
1485  if (source_array_ids.size() / num_worker_threads > 10) {
1486  std::vector<std::future<void>> worker_threads;
1487  for (int i = 0; i < num_worker_threads; ++i) {
1488  worker_threads.push_back(std::async(std::launch::async, processor, i));
1489  }
1490 
1491  for (auto& child : worker_threads) {
1492  child.wait();
1493  }
1494  for (auto& child : worker_threads) {
1495  child.get();
1496  }
1497  } else {
1498  processor(0);
1499  }
1500 }
1501 
1502 void translate_string_ids(std::vector<int32_t>& dest_ids,
1503  const LeafHostInfo& dict_server_host,
1504  const DictRef dest_dict_ref,
1505  const std::vector<int32_t>& source_ids,
1506  const DictRef source_dict_ref,
1507  const int32_t dest_generation) {
1508  DictRef temp_dict_ref(-1, -1);
1509  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1510  string_client.translate_string_ids(
1511  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1512 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int open(const char *path, int flags, int mode)
Definition: omnisci_fs.cpp:64
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
uint32_t rk_hash(const std::string_view &str)
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
int32_t getIdOfString(const std::string &str) const
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::string getStringChecked(const int string_id) const noexcept
std::string getString(int32_t string_id) const
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:210
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
Constants for Builtin SQL Types supported by OmniSci.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:209
int32_t getOrAddImpl(const std::string &str) noexcept
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
size_t storageEntryCount() const
void increaseCapacity() noexcept
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool g_cache_string_hash
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
void close(const int fd)
Definition: omnisci_fs.cpp:68
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
void log_encoding_error(std::string_view str)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:40
DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
#define CHECK_NE(x, y)
Definition: Logger.h:206
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
mapd_shared_mutex rw_mutex_
DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int checked_open(const char *path, const bool recover)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
std::vector< int32_t > string_id_hash_table_
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
#define CHECK_LE(x, y)
Definition: Logger.h:208
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
std::shared_ptr< const std::vector< std::string > > copyStrings() const
std::unique_ptr< StringDictionaryClient > client_no_timeout_
ThreadId thread_id()
Definition: Logger.cpp:731
mapd_shared_lock< mapd_shared_mutex > read_lock
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:125
#define CHECK(condition)
Definition: Logger.h:197
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
const uint64_t round_up_p2(const uint64_t num)
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
int get_page_size()
Definition: omnisci_fs.cpp:27
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:24
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:291
void appendToStorage(String str) noexcept
~StringDictionary() noexcept
std::vector< uint32_t > rk_hashes_
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31