OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Shared/DatumFetchers.h"
19 #include "StringOps/StringOps.h"
20 
21 #include <tbb/parallel_for.h>
22 #include <tbb/task_arena.h>
23 #include <algorithm>
24 #include <boost/filesystem/operations.hpp>
25 #include <boost/filesystem/path.hpp>
26 #include <boost/sort/spreadsort/string_sort.hpp>
27 #include <functional>
28 #include <future>
29 #include <iostream>
30 #include <numeric>
31 #include <string_view>
32 #include <thread>
33 #include <type_traits>
34 
35 // TODO(adb): fixup
36 #ifdef _WIN32
37 #include <fcntl.h>
38 #include <io.h>
39 #else
40 #include <sys/fcntl.h>
41 #endif
42 
43 #include "Logger/Logger.h"
44 #include "OSDependent/heavyai_fs.h"
45 #include "Shared/sqltypes.h"
46 #include "Shared/thread_count.h"
47 #include "StringDictionaryClient.h"
48 #include "Utils/Regexp.h"
49 #include "Utils/StringLike.h"
50 
51 #include "LeafHostInfo.h"
52 
54 
55 namespace {
56 
58 
59 int checked_open(const char* path, const bool recover) {
60  auto fd = heavyai::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
61  if (fd > 0) {
62  return fd;
63  }
64  auto err = std::string("Dictionary path ") + std::string(path) +
65  std::string(" does not exist.");
66  LOG(ERROR) << err;
67  throw DictPayloadUnavailable(err);
68 }
69 
70 const uint64_t round_up_p2(const uint64_t num) {
71  uint64_t in = num;
72  in--;
73  in |= in >> 1;
74  in |= in >> 2;
75  in |= in >> 4;
76  in |= in >> 8;
77  in |= in >> 16;
78  in++;
79  // TODO MAT deal with case where filesize has been increased but reality is
80  // we are constrained to 2^31.
81  // In that situation this calculation will wrap to zero
82  if (in == 0 || (in > (UINT32_MAX))) {
83  in = UINT32_MAX;
84  }
85  return in;
86 }
87 
88 string_dict_hash_t hash_string(const std::string_view& str) {
89  string_dict_hash_t str_hash = 1;
90  // rely on fact that unsigned overflow is defined and wraps
91  for (size_t i = 0; i < str.size(); ++i) {
92  str_hash = str_hash * 997 + str[i];
93  }
94  return str_hash;
95 }
96 
97 struct ThreadInfo {
98  int64_t num_threads{0};
100 
101  ThreadInfo(const int64_t max_thread_count,
102  const int64_t num_elems,
103  const int64_t target_elems_per_thread) {
104  num_threads =
105  std::min(std::max(max_thread_count, int64_t(1)),
106  ((num_elems + target_elems_per_thread - 1) / target_elems_per_thread));
107  num_elems_per_thread =
108  std::max((num_elems + num_threads - 1) / num_threads, int64_t(1));
109  }
110 };
111 
112 } // namespace
113 
115 constexpr int32_t StringDictionary::INVALID_STR_ID;
116 constexpr size_t StringDictionary::MAX_STRLEN;
117 constexpr size_t StringDictionary::MAX_STRCOUNT;
118 
120  const std::string& folder,
121  const bool isTemp,
122  const bool recover,
123  const bool materializeHashes,
124  size_t initial_capacity)
125  : dict_key_(dict_key)
126  , folder_(folder)
127  , str_count_(0)
128  , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
129  , hash_cache_(initial_capacity)
130  , isTemp_(isTemp)
131  , materialize_hashes_(materializeHashes)
132  , payload_fd_(-1)
133  , offset_fd_(-1)
134  , offset_map_(nullptr)
135  , payload_map_(nullptr)
136  , offset_file_size_(0)
137  , payload_file_size_(0)
138  , payload_file_off_(0)
139  , like_cache_size_(0)
140  , regex_cache_size_(0)
141  , equal_cache_size_(0)
142  , compare_cache_size_(0)
143  , strings_cache_(nullptr)
144  , strings_cache_size_(0) {
145  if (!isTemp && folder.empty()) {
146  return;
147  }
148 
149  // initial capacity must be a power of two for efficient bucket computation
150  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
151  if (!isTemp_) {
152  boost::filesystem::path storage_path(folder);
153  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
154  const auto payload_path =
155  (storage_path / boost::filesystem::path("DictPayload")).string();
156  payload_fd_ = checked_open(payload_path.c_str(), recover);
157  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
160  }
161  bool storage_is_empty = false;
162  if (payload_file_size_ == 0) {
163  storage_is_empty = true;
165  }
166  if (offset_file_size_ == 0) {
168  }
169  if (!isTemp_) { // we never mmap or recover temp dictionaries
170  payload_map_ =
171  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
172  offset_map_ = reinterpret_cast<StringIdxEntry*>(
174  if (recover) {
175  const size_t bytes = heavyai::file_size(offset_fd_);
176  if (bytes % sizeof(StringIdxEntry) != 0) {
177  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
178  }
179  const uint64_t str_count =
180  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
181  collisions_ = 0;
182  // at this point we know the size of the StringDict we need to load
183  // so lets reallocate the vector to the correct size
184  const uint64_t max_entries =
185  std::max(round_up_p2(str_count * 2 + 1),
186  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
187  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
188  string_id_string_dict_hash_table_.swap(new_str_ids);
189  if (materialize_hashes_) {
190  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
191  hash_cache_.swap(new_hash_cache);
192  }
193  // Bail early if we know we don't have strings to add (i.e. a new or empty
194  // dictionary)
195  if (str_count == 0) {
196  return;
197  }
198 
199  unsigned string_id = 0;
200  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
201 
202  uint32_t thread_inits = 0;
203  const auto thread_count = std::thread::hardware_concurrency();
204  const uint32_t items_per_thread = std::max<uint32_t>(
205  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
206  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
207  dictionary_futures;
208  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
209  dictionary_futures.emplace_back(std::async(
210  std::launch::async, [string_id, str_count, items_per_thread, this] {
211  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
212  for (uint32_t curr_id = string_id;
213  curr_id < string_id + items_per_thread && curr_id < str_count;
214  curr_id++) {
215  const auto recovered = getStringFromStorage(curr_id);
216  if (recovered.canary) {
217  // hit the canary, recovery finished
218  break;
219  } else {
220  std::string_view temp(recovered.c_str_ptr, recovered.size);
221  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
222  }
223  }
224  return hashVec;
225  }));
226  thread_inits++;
227  if (thread_inits % thread_count == 0) {
228  processDictionaryFutures(dictionary_futures);
229  }
230  }
231  // gather last few threads
232  if (dictionary_futures.size() != 0) {
233  processDictionaryFutures(dictionary_futures);
234  }
235  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
236  << " Hash table size: " << string_id_string_dict_hash_table_.size()
237  << " Fill rate: "
238  << static_cast<double>(str_count_) * 100.0 /
240  << "% Collisions: " << collisions_;
241  }
242  }
243 }
244 
245 namespace {
247  std::unordered_map<std::string, int32_t> map_;
248 
249  public:
250  void operator()(std::string const& str, int32_t const string_id) override {
251  auto const emplaced = map_.emplace(str, string_id);
252  CHECK(emplaced.second) << "str(" << str << ") string_id(" << string_id << ')';
253  }
254  void operator()(std::string_view const, int32_t const string_id) override {
255  UNREACHABLE() << "MapMaker must be called with a std::string.";
256  }
257  std::unordered_map<std::string, int32_t> moveMap() { return std::move(map_); }
258 };
259 } // namespace
260 
261 std::function<int32_t(std::string const&)> StringDictionary::makeLambdaStringToId()
262  const {
263  CHECK(isClient());
264  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
265  MapMaker map_maker;
266  eachStringSerially(big_gen, map_maker);
267  return [map{map_maker.moveMap()}](std::string const& str) {
268  auto const itr = map.find(str);
269  return itr == map.cend() ? INVALID_STR_ID : itr->second;
270  };
271 }
272 
273 // Call serial_callback for each (string/_view, string_id). Must be called serially.
274 void StringDictionary::eachStringSerially(int64_t const generation,
275  StringCallback& serial_callback) const {
276  if (isClient()) {
277  // copyStrings() is not supported when isClient().
278  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
279  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
280  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
281  for (unsigned id = 0; id < n; ++id) {
282  {
283  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
284  client_->get_string(str, id);
285  }
286  serial_callback(str, id);
287  }
288  } else {
289  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
290  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
291  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
292  for (unsigned id = 0; id < n; ++id) {
293  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
294  }
295  }
296 }
297 
299  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
300  dictionary_futures) {
301  for (auto& dictionary_future : dictionary_futures) {
302  dictionary_future.wait();
303  const auto hashVec = dictionary_future.get();
304  for (const auto& hash : hashVec) {
305  const uint32_t bucket =
307  payload_file_off_ += hash.second;
308  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
309  if (materialize_hashes_) {
310  hash_cache_[str_count_] = hash.first;
311  }
312  ++str_count_;
313  }
314  }
315  dictionary_futures.clear();
316 }
317 
319  return dict_key_;
320 }
321 
330  const size_t storage_slots) const noexcept {
331  if (storage_slots == 0) {
332  return 0;
333  }
334  // Must use signed integers since final binary search step can wrap to max size_t value
335  // if dictionary is empty
336  int64_t min_bound = 0;
337  int64_t max_bound = storage_slots - 1;
338  int64_t guess{0};
339  while (min_bound <= max_bound) {
340  guess = (max_bound + min_bound) / 2;
341  CHECK_GE(guess, 0);
342  if (getStringFromStorage(guess).canary) {
343  max_bound = guess - 1;
344  } else {
345  min_bound = guess + 1;
346  }
347  }
348  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
349  return guess + (min_bound > guess ? 1 : 0);
350 }
351 
353  const shared::StringDictKey& dict_key)
354  : dict_key_(dict_key)
355  , folder_("DB_" + std::to_string(dict_key.db_id) + "_DICT_" +
356  std::to_string(dict_key.dict_id))
357  , strings_cache_(nullptr)
358  , client_(new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, true))
360  new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, false)) {}
361 
363  free(CANARY_BUFFER);
364  if (isClient()) {
365  return;
366  }
367  if (payload_map_) {
368  if (!isTemp_) {
372  CHECK_GE(payload_fd_, 0);
374  CHECK_GE(offset_fd_, 0);
376  } else {
378  free(payload_map_);
379  free(offset_map_);
380  }
381  }
382 }
383 
385 
386 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
387  if (isClient()) {
388  std::vector<int32_t> string_ids;
389  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
390  CHECK_EQ(size_t(1), string_ids.size());
391  return string_ids.front();
392  }
393  return getOrAddImpl(str);
394 }
395 
396 namespace {
397 
398 template <class T>
399 void throw_encoding_error(std::string_view str, const shared::StringDictKey& dict_key) {
400  std::ostringstream oss;
401  oss << "The text encoded column using dictionary " << dict_key
402  << " has exceeded it's limit of " << sizeof(T) * 8 << " bits ("
403  << static_cast<size_t>(max_valid_int_value<T>() + 1) << " unique values) "
404  << "while attempting to add the new string '" << str << "'. ";
405 
406  if (sizeof(T) < 4) {
407  // Todo: Implement automatic type widening for dictionary-encoded text
408  // columns/all fixed length columm types (at least if not defined
409  // with fixed encoding size), or short of that, ALTER TABLE
410  // COLUMN TYPE to at least allow the user to do this manually
411  // without re-creating the table
412 
413  oss << "To load more data, please re-create the table with "
414  << "this column as type TEXT ENCODING DICT(" << sizeof(T) * 2 * 8 << ") ";
415  if (sizeof(T) == 1) {
416  oss << "or TEXT ENCODING DICT(32) ";
417  }
418  oss << "and reload your data.";
419  } else {
420  // Todo: Implement TEXT ENCODING DICT(64) type which should essentially
421  // preclude overflows.
422  oss << "Currently dictionary-encoded text columns support a maximum of "
424  << " strings. Consider recreating the table with "
425  << "this column as type TEXT ENCODING NONE and reloading your data.";
426  }
427  LOG(ERROR) << oss.str();
428  throw std::runtime_error(oss.str());
429 }
430 
431 void throw_string_too_long_error(std::string_view str,
432  const shared::StringDictKey& dict_key) {
433  std::ostringstream oss;
434  oss << "The string '" << str << " could not be inserted into the dictionary "
435  << dict_key << " because it exceeded the maximum allowable "
436  << "length of " << StringDictionary::MAX_STRLEN << " characters (string was "
437  << str.size() << " characters).";
438  LOG(ERROR) << oss.str();
439  throw std::runtime_error(oss.str());
440 }
441 
442 } // namespace
443 
444 template <class String>
446  const std::vector<std::vector<String>>& string_array_vec,
447  std::vector<std::vector<int32_t>>& ids_array_vec) {
448  if (client_no_timeout_) {
449  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
450  return;
451  }
452 
453  ids_array_vec.resize(string_array_vec.size());
454  for (size_t i = 0; i < string_array_vec.size(); i++) {
455  auto& strings = string_array_vec[i];
456  auto& ids = ids_array_vec[i];
457  ids.resize(strings.size());
458  getOrAddBulk(strings, &ids[0]);
459  }
460 }
461 
463  const std::vector<std::vector<std::string>>& string_array_vec,
464  std::vector<std::vector<int32_t>>& ids_array_vec);
465 
467  const std::vector<std::vector<std::string_view>>& string_array_vec,
468  std::vector<std::vector<int32_t>>& ids_array_vec);
469 
475 template <class String>
477  const std::vector<String>& string_vec,
478  std::vector<string_dict_hash_t>& hashes) const noexcept {
479  CHECK_EQ(string_vec.size(), hashes.size());
480 
481  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
482  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
483  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
484  if (string_vec[curr_id].empty()) {
485  continue;
486  }
487  hashes[curr_id] = hash_string(string_vec[curr_id]);
488  }
489  });
490 }
491 
492 template <class T, class String>
493 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
494  T* encoded_vec) const {
495  return getBulk(string_vec, encoded_vec, -1L /* generation */);
496 }
497 
498 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
499  uint8_t* encoded_vec) const;
500 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
501  uint16_t* encoded_vec) const;
502 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
503  int32_t* encoded_vec) const;
504 
505 template <class T, class String>
506 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
507  T* encoded_vec,
508  const int64_t generation) const {
509  constexpr int64_t target_strings_per_thread{1000};
510  const int64_t num_lookup_strings = string_vec.size();
511  if (num_lookup_strings == 0) {
512  return 0;
513  }
514 
515  const ThreadInfo thread_info(
516  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
517  CHECK_GE(thread_info.num_threads, 1L);
518  CHECK_GE(thread_info.num_elems_per_thread, 1L);
519 
520  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
521 
522  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
523  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
524  const bool dictionary_is_empty = (num_dict_strings == 0);
525  if (dictionary_is_empty) {
526  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
527  [&](const tbb::blocked_range<int64_t>& r) {
528  const int64_t start_idx = r.begin();
529  const int64_t end_idx = r.end();
530  for (int64_t string_idx = start_idx; string_idx < end_idx;
531  ++string_idx) {
532  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
533  }
534  });
535  return num_lookup_strings;
536  }
537  // If we're here the generation-capped dictionary has strings in it
538  // that we need to look up against
539 
540  tbb::task_arena limited_arena(thread_info.num_threads);
541  limited_arena.execute([&] {
543  tbb::blocked_range<int64_t>(
544  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
545  [&](const tbb::blocked_range<int64_t>& r) {
546  const int64_t start_idx = r.begin();
547  const int64_t end_idx = r.end();
548  size_t num_strings_not_found = 0;
549  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
550  const auto& input_string = string_vec[string_idx];
551  if (input_string.empty()) {
552  encoded_vec[string_idx] = inline_int_null_value<T>();
553  continue;
554  }
555  if (input_string.size() > StringDictionary::MAX_STRLEN) {
556  throw_string_too_long_error(input_string, dict_key_);
557  }
558  const string_dict_hash_t input_string_hash = hash_string(input_string);
559  uint32_t hash_bucket = computeBucket(
560  input_string_hash, input_string, string_id_string_dict_hash_table_);
561  // Will either be legit id or INVALID_STR_ID
562  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
563  if (string_id == StringDictionary::INVALID_STR_ID ||
564  string_id >= num_dict_strings) {
565  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
566  num_strings_not_found++;
567  continue;
568  }
569  encoded_vec[string_idx] = string_id;
570  }
571  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
572  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
573  },
574  tbb::simple_partitioner());
575  });
576 
577  size_t num_strings_not_found = 0;
578  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
579  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
580  }
581  return num_strings_not_found;
582 }
583 
584 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
585  uint8_t* encoded_vec,
586  const int64_t generation) const;
587 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
588  uint16_t* encoded_vec,
589  const int64_t generation) const;
590 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
591  int32_t* encoded_vec,
592  const int64_t generation) const;
593 
594 template <class T, class String>
595 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
596  T* output_string_ids) {
598  getOrAddBulkParallel(input_strings, output_string_ids);
599  return;
600  }
601  // Single-thread path.
602  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
603 
604  const size_t initial_str_count = str_count_;
605  size_t idx = 0;
606  for (const auto& input_string : input_strings) {
607  if (input_string.empty()) {
608  output_string_ids[idx++] = inline_int_null_value<T>();
609  continue;
610  }
611  CHECK(input_string.size() <= MAX_STRLEN);
612 
613  const string_dict_hash_t input_string_hash = hash_string(input_string);
614  uint32_t hash_bucket =
615  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
617  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
618  continue;
619  }
620  // need to add record to dictionary
621  // check there is room
622  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
623  throw_encoding_error<T>(input_string, dict_key_);
624  }
626  << "Maximum number (" << str_count_
627  << ") of Dictionary encoded Strings reached for this column, offset path "
628  "for column is "
629  << offsets_path_;
630  if (fillRateIsHigh(str_count_)) {
631  // resize when more than 50% is full
633  hash_bucket = computeBucket(
634  input_string_hash, input_string, string_id_string_dict_hash_table_);
635  }
636  appendToStorage(input_string);
637 
638  if (materialize_hashes_) {
639  hash_cache_[str_count_] = input_string_hash;
640  }
641  const int32_t string_id = static_cast<int32_t>(str_count_);
642  string_id_string_dict_hash_table_[hash_bucket] = string_id;
643  output_string_ids[idx++] = string_id;
644  ++str_count_;
645  }
646  const size_t num_strings_added = str_count_ - initial_str_count;
647  if (num_strings_added > 0) {
649  }
650 }
651 
652 template <class T, class String>
653 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
654  T* output_string_ids) {
655  // Compute hashes of the input strings up front, and in parallel,
656  // as the string hashing does not need to be behind the subsequent write_lock
657  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
658  hashStrings(input_strings, input_strings_hashes);
659 
660  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
661  size_t shadow_str_count =
662  str_count_; // Need to shadow str_count_ now with bulk add methods
663  const size_t storage_high_water_mark = shadow_str_count;
664  std::vector<size_t> string_memory_ids;
665  size_t sum_new_string_lengths = 0;
666  string_memory_ids.reserve(input_strings.size());
667  size_t input_string_idx{0};
668  for (const auto& input_string : input_strings) {
669  // Currently we make empty strings null
670  if (input_string.empty()) {
671  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
672  continue;
673  }
674  // TODO: Recover gracefully if an input string is too long
675  CHECK(input_string.size() <= MAX_STRLEN);
676 
677  if (fillRateIsHigh(shadow_str_count)) {
678  // resize when more than 50% is full
680  storage_high_water_mark,
681  input_strings,
682  string_memory_ids,
683  input_strings_hashes);
684  }
685  // Compute the hash for this input_string
686  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
687 
688  const uint32_t hash_bucket =
689  computeBucketFromStorageAndMemory(input_string_hash,
690  input_string,
692  storage_high_water_mark,
693  input_strings,
694  string_memory_ids);
695 
696  // If the hash bucket is not empty, that is our string id
697  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
698  // bucket string are equal)
700  output_string_ids[input_string_idx++] =
702  continue;
703  }
704  // Did not find string, so need to add record to dictionary
705  // First check there is room
706  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
707  throw_encoding_error<T>(input_string, dict_key_);
708  }
709  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
710  << "Maximum number (" << shadow_str_count
711  << ") of Dictionary encoded Strings reached for this column, offset path "
712  "for column is "
713  << offsets_path_;
714 
715  string_memory_ids.push_back(input_string_idx);
716  sum_new_string_lengths += input_string.size();
717  string_id_string_dict_hash_table_[hash_bucket] =
718  static_cast<int32_t>(shadow_str_count);
719  if (materialize_hashes_) {
720  hash_cache_[shadow_str_count] = input_string_hash;
721  }
722  output_string_ids[input_string_idx++] = shadow_str_count++;
723  }
724  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
725  const size_t num_strings_added = shadow_str_count - str_count_;
726  str_count_ = shadow_str_count;
727  if (num_strings_added > 0) {
729  }
730 }
731 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
732  uint8_t* encoded_vec);
733 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
734  uint16_t* encoded_vec);
735 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
736  int32_t* encoded_vec);
737 
738 template void StringDictionary::getOrAddBulk(
739  const std::vector<std::string_view>& string_vec,
740  uint8_t* encoded_vec);
741 template void StringDictionary::getOrAddBulk(
742  const std::vector<std::string_view>& string_vec,
743  uint16_t* encoded_vec);
744 template void StringDictionary::getOrAddBulk(
745  const std::vector<std::string_view>& string_vec,
746  int32_t* encoded_vec);
747 
748 template <class String>
749 int32_t StringDictionary::getIdOfString(const String& str) const {
750  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
751  if (isClient()) {
752  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
753  return client_->get(str);
754  } else {
755  return client_->get(std::string(str));
756  }
757  }
758  return getUnlocked(str);
759 }
760 
761 template int32_t StringDictionary::getIdOfString(const std::string&) const;
762 template int32_t StringDictionary::getIdOfString(const std::string_view&) const;
763 
764 int32_t StringDictionary::getUnlocked(const std::string_view sv) const noexcept {
765  const string_dict_hash_t hash = hash_string(sv);
766  auto str_id = string_id_string_dict_hash_table_[computeBucket(
767  hash, sv, string_id_string_dict_hash_table_)];
768  return str_id;
769 }
770 
771 std::string StringDictionary::getString(int32_t string_id) const {
772  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
773  if (isClient()) {
774  std::string ret;
775  client_->get_string(ret, string_id);
776  return ret;
777  }
778  return getStringUnlocked(string_id);
779 }
780 
781 std::string_view StringDictionary::getStringView(int32_t string_id) const {
782  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
783  CHECK(!isClient()) << "use of this function is unsupported in distributed";
784  return getStringViewUnlocked(string_id);
785 }
786 
787 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
788  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
789  return getStringChecked(string_id);
790 }
791 
793  int32_t string_id) const noexcept {
794  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
795  return getStringViewChecked(string_id);
796 }
797 
798 std::pair<char*, size_t> StringDictionary::getStringBytes(
799  int32_t string_id) const noexcept {
800  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
801  CHECK(!isClient());
802  CHECK_LE(0, string_id);
803  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
804  return getStringBytesChecked(string_id);
805 }
806 
808  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
809  if (isClient()) {
810  return client_->storage_entry_count();
811  }
812  return str_count_;
813 }
814 
815 namespace {
816 
817 bool is_like(const std::string& str,
818  const std::string& pattern,
819  const bool icase,
820  const bool is_simple,
821  const char escape) {
822  return icase
823  ? (is_simple ? string_ilike_simple(
824  str.c_str(), str.size(), pattern.c_str(), pattern.size())
825  : string_ilike(str.c_str(),
826  str.size(),
827  pattern.c_str(),
828  pattern.size(),
829  escape))
830  : (is_simple ? string_like_simple(
831  str.c_str(), str.size(), pattern.c_str(), pattern.size())
832  : string_like(str.c_str(),
833  str.size(),
834  pattern.c_str(),
835  pattern.size(),
836  escape));
837 }
838 
839 } // namespace
840 
841 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
842  const bool icase,
843  const bool is_simple,
844  const char escape,
845  const size_t generation) const {
846  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
847  if (isClient()) {
848  return client_->get_like(pattern, icase, is_simple, escape, generation);
849  }
850  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
851  const auto it = like_cache_.find(cache_key);
852  if (it != like_cache_.end()) {
853  return it->second;
854  }
855  std::vector<int32_t> result;
856  std::vector<std::thread> workers;
857  int worker_count = cpu_threads();
858  CHECK_GT(worker_count, 0);
859  std::vector<std::vector<int32_t>> worker_results(worker_count);
860  CHECK_LE(generation, str_count_);
861  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
862  workers.emplace_back([&worker_results,
863  &pattern,
864  generation,
865  icase,
866  is_simple,
867  escape,
868  worker_idx,
869  worker_count,
870  this]() {
871  for (size_t string_id = worker_idx; string_id < generation;
872  string_id += worker_count) {
873  const auto str = getStringUnlocked(string_id);
874  if (is_like(str, pattern, icase, is_simple, escape)) {
875  worker_results[worker_idx].push_back(string_id);
876  }
877  }
878  });
879  }
880  for (auto& worker : workers) {
881  worker.join();
882  }
883  for (const auto& worker_result : worker_results) {
884  result.insert(result.end(), worker_result.begin(), worker_result.end());
885  }
886  // place result into cache for reuse if similar query
887  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
888  like_cache_size_ += (pattern.size() + 3 + (result.size() * sizeof(int32_t)));
889 
890  CHECK(it_ok.second);
891 
892  return result;
893 }
894 
895 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
896  std::string comp_operator,
897  size_t generation) {
898  std::vector<int32_t> result;
899  auto eq_id_itr = equal_cache_.find(pattern);
900  int32_t eq_id = MAX_STRLEN + 1;
901  int32_t cur_size = str_count_;
902  if (eq_id_itr != equal_cache_.end()) {
903  eq_id = eq_id_itr->second;
904  if (comp_operator == "=") {
905  result.push_back(eq_id);
906  } else {
907  for (int32_t idx = 0; idx <= cur_size; idx++) {
908  if (idx == eq_id) {
909  continue;
910  }
911  result.push_back(idx);
912  }
913  }
914  } else {
915  std::vector<std::thread> workers;
916  int worker_count = cpu_threads();
917  CHECK_GT(worker_count, 0);
918  std::vector<std::vector<int32_t>> worker_results(worker_count);
919  CHECK_LE(generation, str_count_);
920  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
921  workers.emplace_back(
922  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
923  for (size_t string_id = worker_idx; string_id < generation;
924  string_id += worker_count) {
925  const auto str = getStringUnlocked(string_id);
926  if (str == pattern) {
927  worker_results[worker_idx].push_back(string_id);
928  }
929  }
930  });
931  }
932  for (auto& worker : workers) {
933  worker.join();
934  }
935  for (const auto& worker_result : worker_results) {
936  result.insert(result.end(), worker_result.begin(), worker_result.end());
937  }
938  if (result.size() > 0) {
939  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
940  equal_cache_size_ += (pattern.size() + (result.size() * sizeof(int32_t)));
941  CHECK(it_ok.second);
942  eq_id = result[0];
943  }
944  if (comp_operator == "<>") {
945  for (int32_t idx = 0; idx <= cur_size; idx++) {
946  if (idx == eq_id) {
947  continue;
948  }
949  result.push_back(idx);
950  }
951  }
952  }
953  return result;
954 }
955 
956 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
957  const std::string& comp_operator,
958  const size_t generation) {
959  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
960  if (isClient()) {
961  return client_->get_compare(pattern, comp_operator, generation);
962  }
963  std::vector<int32_t> ret;
964  if (str_count_ == 0) {
965  return ret;
966  }
967  if (sorted_cache.size() < str_count_) {
968  if (comp_operator == "=" || comp_operator == "<>") {
969  return getEquals(pattern, comp_operator, generation);
970  }
971 
973  }
974  auto cache_index = compare_cache_.get(pattern);
975 
976  if (!cache_index) {
977  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
978  const auto cache_itr = std::lower_bound(
979  sorted_cache.begin(),
980  sorted_cache.end(),
981  pattern,
982  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
983  auto a_str = this->getStringFromStorage(a);
984  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
985  });
986 
987  if (cache_itr == sorted_cache.end()) {
988  cache_index->index = sorted_cache.size() - 1;
989  cache_index->diff = 1;
990  } else {
991  const auto cache_str = getStringFromStorage(*cache_itr);
992  if (!string_eq(
993  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
994  cache_index->index = cache_itr - sorted_cache.begin() - 1;
995  cache_index->diff = 1;
996  } else {
997  cache_index->index = cache_itr - sorted_cache.begin();
998  cache_index->diff = 0;
999  }
1000  }
1001 
1002  compare_cache_.put(pattern, cache_index);
1003  compare_cache_size_ += (pattern.size() + sizeof(cache_index));
1004  }
1005 
1006  // since we have a cache in form of vector of ints which is sorted according to
1007  // corresponding strings in the dictionary all we need is the index of the element
1008  // which equal to the pattern that we are trying to match or the index of “biggest”
1009  // element smaller than the pattern, to perform all the comparison operators over
1010  // string. The search function guarantees we have such index so now it is just the
1011  // matter to include all the elements in the result vector.
1012 
1013  // For < operator if the index that we have points to the element which is equal to
1014  // the pattern that we are searching for we simply get all the elements less than the
1015  // index. If the element pointed by the index is not equal to the pattern we are
1016  // comparing with we also need to include that index in result vector, except when the
1017  // index points to 0 and the pattern is lesser than the smallest value in the string
1018  // dictionary.
1019 
1020  if (comp_operator == "<") {
1021  size_t idx = cache_index->index;
1022  if (cache_index->diff) {
1023  idx = cache_index->index + 1;
1024  if (cache_index->index == 0 && cache_index->diff > 0) {
1025  idx = cache_index->index;
1026  }
1027  }
1028  for (size_t i = 0; i < idx; i++) {
1029  ret.push_back(sorted_cache[i]);
1030  }
1031 
1032  // For <= operator if the index that we have points to the element which is equal to
1033  // the pattern that we are searching for we want to include the element pointed by
1034  // the index in the result set. If the element pointed by the index is not equal to
1035  // the pattern we are comparing with we just want to include all the ids with index
1036  // less than the index that is cached, except when pattern that we are searching for
1037  // is smaller than the smallest string in the dictionary.
1038 
1039  } else if (comp_operator == "<=") {
1040  size_t idx = cache_index->index + 1;
1041  if (cache_index == 0 && cache_index->diff > 0) {
1042  idx = cache_index->index;
1043  }
1044  for (size_t i = 0; i < idx; i++) {
1045  ret.push_back(sorted_cache[i]);
1046  }
1047 
1048  // For > operator we want to get all the elements with index greater than the index
1049  // that we have except, when the pattern we are searching for is lesser than the
1050  // smallest string in the dictionary we also want to include the id of the index
1051  // that we have.
1052 
1053  } else if (comp_operator == ">") {
1054  size_t idx = cache_index->index + 1;
1055  if (cache_index->index == 0 && cache_index->diff > 0) {
1056  idx = cache_index->index;
1057  }
1058  for (size_t i = idx; i < sorted_cache.size(); i++) {
1059  ret.push_back(sorted_cache[i]);
1060  }
1061 
1062  // For >= operator when the indexed element that we have points to element which is
1063  // equal to the pattern we are searching for we want to include that in the result
1064  // vector. If the index that we have does not point to the string which is equal to
1065  // the pattern we are searching we don’t want to include that id into the result
1066  // vector except when the index is 0.
1067 
1068  } else if (comp_operator == ">=") {
1069  size_t idx = cache_index->index;
1070  if (cache_index->diff) {
1071  idx = cache_index->index + 1;
1072  if (cache_index->index == 0 && cache_index->diff > 0) {
1073  idx = cache_index->index;
1074  }
1075  }
1076  for (size_t i = idx; i < sorted_cache.size(); i++) {
1077  ret.push_back(sorted_cache[i]);
1078  }
1079  } else if (comp_operator == "=") {
1080  if (!cache_index->diff) {
1081  ret.push_back(sorted_cache[cache_index->index]);
1082  }
1083 
1084  // For <> operator it is simple matter of not including id of string which is equal
1085  // to pattern we are searching for.
1086  } else if (comp_operator == "<>") {
1087  if (!cache_index->diff) {
1088  size_t idx = cache_index->index;
1089  for (size_t i = 0; i < idx; i++) {
1090  ret.push_back(sorted_cache[i]);
1091  }
1092  ++idx;
1093  for (size_t i = idx; i < sorted_cache.size(); i++) {
1094  ret.push_back(sorted_cache[i]);
1095  }
1096  } else {
1097  for (size_t i = 0; i < sorted_cache.size(); i++) {
1098  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1099  }
1100  }
1101 
1102  } else {
1103  std::runtime_error("Unsupported string comparison operator");
1104  }
1105  return ret;
1106 }
1107 
1108 namespace {
1109 
1110 bool is_regexp_like(const std::string& str,
1111  const std::string& pattern,
1112  const char escape) {
1113  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
1114 }
1115 
1116 } // namespace
1117 
1118 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
1119  const char escape,
1120  const size_t generation) const {
1121  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1122  if (isClient()) {
1123  return client_->get_regexp_like(pattern, escape, generation);
1124  }
1125  const auto cache_key = std::make_pair(pattern, escape);
1126  const auto it = regex_cache_.find(cache_key);
1127  if (it != regex_cache_.end()) {
1128  return it->second;
1129  }
1130  std::vector<int32_t> result;
1131  std::vector<std::thread> workers;
1132  int worker_count = cpu_threads();
1133  CHECK_GT(worker_count, 0);
1134  std::vector<std::vector<int32_t>> worker_results(worker_count);
1135  CHECK_LE(generation, str_count_);
1136  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1137  workers.emplace_back([&worker_results,
1138  &pattern,
1139  generation,
1140  escape,
1141  worker_idx,
1142  worker_count,
1143  this]() {
1144  for (size_t string_id = worker_idx; string_id < generation;
1145  string_id += worker_count) {
1146  const auto str = getStringUnlocked(string_id);
1147  if (is_regexp_like(str, pattern, escape)) {
1148  worker_results[worker_idx].push_back(string_id);
1149  }
1150  }
1151  });
1152  }
1153  for (auto& worker : workers) {
1154  worker.join();
1155  }
1156  for (const auto& worker_result : worker_results) {
1157  result.insert(result.end(), worker_result.begin(), worker_result.end());
1158  }
1159  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1160  regex_cache_size_ += (pattern.size() + 1 + (result.size() * sizeof(int32_t)));
1161  CHECK(it_ok.second);
1162 
1163  return result;
1164 }
1165 
1166 std::vector<std::string> StringDictionary::copyStrings() const {
1167  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1168  if (isClient()) {
1169  // TODO(miyu): support remote string dictionary
1170  throw std::runtime_error(
1171  "copying dictionaries from remote server is not supported yet.");
1172  }
1173 
1174  if (strings_cache_) {
1175  return *strings_cache_;
1176  }
1177 
1178  strings_cache_ = std::make_shared<std::vector<std::string>>();
1179  strings_cache_->reserve(str_count_);
1180  const bool multithreaded = str_count_ > 10000;
1181  const auto worker_count =
1182  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1183  CHECK_GT(worker_count, 0UL);
1184  std::vector<std::vector<std::string>> worker_results(worker_count);
1185  std::vector<size_t> string_size(worker_count, 0);
1186  auto copy = [this, &string_size](std::vector<std::string>& str_list,
1187  const size_t worker_idx,
1188  const size_t start_id,
1189  const size_t end_id) {
1190  CHECK_LE(start_id, end_id);
1191  str_list.reserve(end_id - start_id);
1192  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1193  auto str = getStringUnlocked(string_id);
1194  string_size[worker_idx] += str.size();
1195  str_list.push_back(str);
1196  }
1197  };
1198  if (multithreaded) {
1199  std::vector<std::future<void>> workers;
1200  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1201  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1202  worker_idx < worker_count && start < str_count_;
1203  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1204  workers.push_back(std::async(std::launch::async,
1205  copy,
1206  std::ref(worker_results[worker_idx]),
1207  worker_idx,
1208  start,
1209  end));
1210  }
1211  for (auto& worker : workers) {
1212  worker.get();
1213  }
1214  } else {
1215  CHECK_EQ(worker_results.size(), size_t(1));
1216  copy(worker_results[0], 0, 0, str_count_);
1217  }
1218 
1219  for (const auto& worker_result : worker_results) {
1220  strings_cache_->insert(
1221  strings_cache_->end(), worker_result.begin(), worker_result.end());
1222  }
1224  std::accumulate(string_size.begin(), string_size.end(), size_t(0));
1225  return *strings_cache_;
1226 }
1227 
1228 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
1229  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1230 }
1231 
1233  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1234  INVALID_STR_ID);
1235 
1236  if (materialize_hashes_) {
1237  for (size_t i = 0; i != str_count_; ++i) {
1238  const string_dict_hash_t hash = hash_cache_[i];
1239  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1240  new_str_ids[bucket] = i;
1241  }
1242  hash_cache_.resize(hash_cache_.size() * 2);
1243  } else {
1244  for (size_t i = 0; i != str_count_; ++i) {
1245  const auto str = getStringChecked(i);
1246  const string_dict_hash_t hash = hash_string(str);
1247  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1248  new_str_ids[bucket] = i;
1249  }
1250  }
1251  string_id_string_dict_hash_table_.swap(new_str_ids);
1252 }
1253 
1254 template <class String>
1256  const size_t str_count, // str_count_ is only persisted strings, so need transient
1257  // shadow count
1258  const size_t storage_high_water_mark,
1259  const std::vector<String>& input_strings,
1260  const std::vector<size_t>& string_memory_ids,
1261  const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1262  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1263  INVALID_STR_ID);
1264  if (materialize_hashes_) {
1265  for (size_t i = 0; i != str_count; ++i) {
1266  const string_dict_hash_t hash = hash_cache_[i];
1267  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1268  new_str_ids[bucket] = i;
1269  }
1270  hash_cache_.resize(hash_cache_.size() * 2);
1271  } else {
1272  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1273  const auto storage_string = getStringChecked(storage_idx);
1274  const string_dict_hash_t hash = hash_string(storage_string);
1275  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1276  new_str_ids[bucket] = storage_idx;
1277  }
1278  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1279  const size_t string_memory_id = string_memory_ids[memory_idx];
1280  const uint32_t bucket = computeUniqueBucketWithHash(
1281  input_strings_hashes[string_memory_id], new_str_ids);
1282  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1283  }
1284  }
1285  string_id_string_dict_hash_table_.swap(new_str_ids);
1286 }
1287 
1288 int32_t StringDictionary::getOrAddImpl(const std::string_view& str) noexcept {
1289  // @TODO(wei) treat empty string as NULL for now
1290  if (str.size() == 0) {
1291  return inline_int_null_value<int32_t>();
1292  }
1293  CHECK(str.size() <= MAX_STRLEN);
1294  const string_dict_hash_t hash = hash_string(str);
1295  {
1296  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1297  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1298  if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1299  return string_id_string_dict_hash_table_[bucket];
1300  }
1301  }
1302  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1303  if (fillRateIsHigh(str_count_)) {
1304  // resize when more than 50% is full
1305  increaseHashTableCapacity();
1306  }
1307  // need to recalculate the bucket in case it changed before
1308  // we got the lock
1309  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1310  if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1311  CHECK_LT(str_count_, MAX_STRCOUNT)
1312  << "Maximum number (" << str_count_
1313  << ") of Dictionary encoded Strings reached for this column, offset path "
1314  "for column is "
1315  << offsets_path_;
1316  appendToStorage(str);
1317  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1318  if (materialize_hashes_) {
1319  hash_cache_[str_count_] = hash;
1320  }
1321  ++str_count_;
1322  invalidateInvertedIndex();
1323  }
1324  return string_id_string_dict_hash_table_[bucket];
1325 }
1326 
1327 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1328  const auto str_canary = getStringFromStorage(string_id);
1329  CHECK(!str_canary.canary);
1330  return std::string(str_canary.c_str_ptr, str_canary.size);
1331 }
1332 
1334  const int string_id) const noexcept {
1335  const auto str_canary = getStringFromStorage(string_id);
1336  CHECK(!str_canary.canary);
1337  return std::string_view{str_canary.c_str_ptr, str_canary.size};
1338 }
1339 
1341  const int string_id) const noexcept {
1342  const auto str_canary = getStringFromStorage(string_id);
1343  CHECK(!str_canary.canary);
1344  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1345 }
1346 
1347 template <class String>
1349  const string_dict_hash_t hash,
1350  const String& input_string,
1351  const std::vector<int32_t>& string_id_string_dict_hash_table) const noexcept {
1352  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1353  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1354  while (true) {
1355  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1356  if (candidate_string_id ==
1357  INVALID_STR_ID) { // In this case it means the slot is available for use
1358  break;
1359  }
1360  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1361  !materialize_hashes_) {
1362  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1363  if (input_string.size() == candidate_string.size() &&
1364  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1365  // found the string
1366  break;
1367  }
1368  }
1369  // wrap around
1370  if (++bucket == string_dict_hash_table_size) {
1371  bucket = 0;
1372  }
1373  }
1374  return bucket;
1375 }
1376 
1377 template <class String>
1379  const string_dict_hash_t input_string_hash,
1380  const String& input_string,
1381  const std::vector<int32_t>& string_id_string_dict_hash_table,
1382  const size_t storage_high_water_mark,
1383  const std::vector<String>& input_strings,
1384  const std::vector<size_t>& string_memory_ids) const noexcept {
1385  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1386  while (true) {
1387  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1388  if (candidate_string_id ==
1389  INVALID_STR_ID) { // In this case it means the slot is available for use
1390  break;
1391  }
1392  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1393  if (candidate_string_id > 0 &&
1394  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1395  // The candidate string is not in storage yet but in our string_memory_ids temp
1396  // buffer
1397  size_t memory_offset =
1398  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1399  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1400  if (input_string.size() == candidate_string.size() &&
1401  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1402  // found the string in the temp memory buffer
1403  break;
1404  }
1405  } else {
1406  // The candidate string is in storage, need to fetch it for comparison
1407  const auto candidate_storage_string =
1408  getStringFromStorageFast(candidate_string_id);
1409  if (input_string.size() == candidate_storage_string.size() &&
1410  !memcmp(input_string.data(),
1411  candidate_storage_string.data(),
1412  input_string.size())) {
1415  // found the string in storage
1416  break;
1417  }
1418  }
1419  }
1420  if (++bucket == string_id_string_dict_hash_table.size()) {
1421  bucket = 0;
1422  }
1423  }
1424  return bucket;
1425 }
1426 
1428  const string_dict_hash_t hash,
1429  const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1430  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1431  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1432  while (true) {
1433  if (string_id_string_dict_hash_table[bucket] ==
1434  INVALID_STR_ID) { // In this case it means the slot is available for use
1435  break;
1436  }
1437  collisions_++;
1438  // wrap around
1439  if (++bucket == string_dict_hash_table_size) {
1440  bucket = 0;
1441  }
1442  }
1443  return bucket;
1444 }
1445 
1447  const size_t write_length) {
1448  if (payload_file_off_ + write_length > payload_file_size_) {
1449  const size_t min_capacity_needed =
1450  write_length - (payload_file_size_ - payload_file_off_);
1451  if (!isTemp_) {
1452  CHECK_GE(payload_fd_, 0);
1454  addPayloadCapacity(min_capacity_needed);
1455  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1456  payload_map_ =
1457  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1458  } else {
1459  addPayloadCapacity(min_capacity_needed);
1460  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1461  }
1462  }
1463 }
1464 
1466  const size_t write_length) {
1467  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1468  if (offset_file_off + write_length >= offset_file_size_) {
1469  const size_t min_capacity_needed =
1470  write_length - (offset_file_size_ - offset_file_off);
1471  if (!isTemp_) {
1472  CHECK_GE(offset_fd_, 0);
1474  addOffsetCapacity(min_capacity_needed);
1475  CHECK(offset_file_off + write_length <= offset_file_size_);
1476  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1478  } else {
1479  addOffsetCapacity(min_capacity_needed);
1480  CHECK(offset_file_off + write_length <= offset_file_size_);
1481  }
1482  }
1483 }
1484 
1485 template <class String>
1486 void StringDictionary::appendToStorage(const String str) noexcept {
1487  // write the payload
1488  checkAndConditionallyIncreasePayloadCapacity(str.size());
1489  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1490 
1491  // write the offset and length
1492  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1493  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1494 
1495  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1496  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1497 }
1498 
1499 template <class String>
1501  const std::vector<String>& input_strings,
1502  const std::vector<size_t>& string_memory_ids,
1503  const size_t sum_new_strings_lengths) noexcept {
1504  const size_t num_strings = string_memory_ids.size();
1505 
1506  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1507  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1508 
1509  for (size_t i = 0; i < num_strings; ++i) {
1510  const size_t string_idx = string_memory_ids[i];
1511  const String str = input_strings[string_idx];
1512  const size_t str_size(str.size());
1513  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1514  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1515  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1516  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1517  }
1518 }
1519 
1521  const int string_id) const noexcept {
1522  const StringIdxEntry* str_meta = offset_map_ + string_id;
1523  return {payload_map_ + str_meta->off, str_meta->size};
1524 }
1525 
1527  const int string_id) const noexcept {
1528  if (!isTemp_) {
1529  CHECK_GE(payload_fd_, 0);
1530  CHECK_GE(offset_fd_, 0);
1531  }
1532  CHECK_GE(string_id, 0);
1533  const StringIdxEntry* str_meta = offset_map_ + string_id;
1534  if (str_meta->size == 0xffff) {
1535  // hit the canary
1536  return {nullptr, 0, true};
1537  }
1538  return {payload_map_ + str_meta->off, str_meta->size, false};
1539 }
1540 
1541 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1542  if (!isTemp_) {
1543  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1544  } else {
1545  payload_map_ = static_cast<char*>(
1546  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1547  }
1548 }
1549 
1550 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1551  if (!isTemp_) {
1552  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1553  } else {
1554  offset_map_ = static_cast<StringIdxEntry*>(
1555  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1556  }
1557 }
1558 
1560  int fd,
1561  const size_t min_capacity_requested) noexcept {
1562  const size_t canary_buff_size_to_add =
1563  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1564  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1565 
1566  if (canary_buffer_size < canary_buff_size_to_add) {
1567  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1568  canary_buffer_size = canary_buff_size_to_add;
1569  CHECK(CANARY_BUFFER);
1570  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1571  }
1572 
1573  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1574  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1575  CHECK(write_return > 0 &&
1576  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1577  return canary_buff_size_to_add;
1578 }
1579 
1581  size_t& mem_size,
1582  const size_t min_capacity_requested) noexcept {
1583  const size_t canary_buff_size_to_add =
1584  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1585  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1586  if (canary_buffer_size < canary_buff_size_to_add) {
1587  CANARY_BUFFER =
1588  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1589  canary_buffer_size = canary_buff_size_to_add;
1590  CHECK(CANARY_BUFFER);
1591  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1592  }
1593  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1594  CHECK(new_addr);
1595  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1596  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1597  mem_size += canary_buff_size_to_add;
1598  return new_addr;
1599 }
1600 
1602  if (!like_cache_.empty()) {
1603  decltype(like_cache_)().swap(like_cache_);
1604  }
1605  if (!regex_cache_.empty()) {
1606  decltype(regex_cache_)().swap(regex_cache_);
1607  }
1608  if (!equal_cache_.empty()) {
1609  decltype(equal_cache_)().swap(equal_cache_);
1610  }
1611  compare_cache_.invalidateInvertedIndex();
1612 
1613  like_cache_size_ = 0;
1614  regex_cache_size_ = 0;
1615  equal_cache_size_ = 0;
1616  compare_cache_size_ = 0;
1617 }
1618 
1619 // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1620 // load. The next write to the dictionary that does checkpoint will make the
1621 // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1622 // assuming not replicated dictionary
1624  if (isClient()) {
1625  try {
1626  return client_->checkpoint();
1627  } catch (...) {
1628  return false;
1629  }
1630  }
1631  CHECK(!isTemp_);
1632  bool ret = true;
1633  ret = ret &&
1634  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1635  ret = ret &&
1636  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1637  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1638  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1639  return ret;
1640 }
1641 
1642 bool StringDictionary::isClient() const noexcept {
1643  return static_cast<bool>(client_);
1644 }
1645 
1647  // This method is not thread-safe.
1648  const auto cur_cache_size = sorted_cache.size();
1649  std::vector<int32_t> temp_sorted_cache;
1650  for (size_t i = cur_cache_size; i < str_count_; i++) {
1651  temp_sorted_cache.push_back(i);
1652  }
1653  sortCache(temp_sorted_cache);
1654  mergeSortedCache(temp_sorted_cache);
1655 }
1656 
1657 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1658  // This method is not thread-safe.
1659 
1660  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1661  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1662 
1663  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1664  auto a_str = this->getStringFromStorage(a);
1665  auto b_str = this->getStringFromStorage(b);
1666  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1667  });
1668 }
1669 
1670 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1671  // this method is not thread safe
1672  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1673  size_t t_idx = 0, s_idx = 0, idx = 0;
1674  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1675  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1676  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1677  const auto insert_from_temp_cache =
1678  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1679  if (insert_from_temp_cache) {
1680  updated_cache[idx] = temp_sorted_cache[t_idx++];
1681  } else {
1682  updated_cache[idx] = sorted_cache[s_idx++];
1683  }
1684  }
1685  while (t_idx < temp_sorted_cache.size()) {
1686  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1687  }
1688  while (s_idx < sorted_cache.size()) {
1689  updated_cache[idx++] = sorted_cache[s_idx++];
1690  }
1691  sorted_cache.swap(updated_cache);
1692 }
1693 
1695  std::vector<int32_t>& dest_ids,
1696  StringDictionary* dest_dict,
1697  const std::vector<int32_t>& source_ids,
1698  const StringDictionary* source_dict,
1699  const std::vector<std::string const*>& transient_string_vec) {
1700  std::vector<std::string> strings;
1701 
1702  for (const int32_t source_id : source_ids) {
1703  if (source_id == std::numeric_limits<int32_t>::min()) {
1704  strings.emplace_back("");
1705  } else if (source_id < 0) {
1706  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1707  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1708  strings.emplace_back(*transient_string_vec[string_index]);
1709  } else {
1710  strings.push_back(source_dict->getString(source_id));
1711  }
1712  }
1713 
1714  dest_ids.resize(strings.size());
1715  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1716 }
1717 
1719  std::vector<std::vector<int32_t>>& dest_array_ids,
1720  StringDictionary* dest_dict,
1721  const std::vector<std::vector<int32_t>>& source_array_ids,
1722  const StringDictionary* source_dict) {
1723  dest_array_ids.resize(source_array_ids.size());
1724 
1725  std::atomic<size_t> row_idx{0};
1726  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1727  int thread_id) {
1728  for (;;) {
1729  auto row = row_idx.fetch_add(1);
1730 
1731  if (row >= dest_array_ids.size()) {
1732  return;
1733  }
1734  const auto& source_ids = source_array_ids[row];
1735  auto& dest_ids = dest_array_ids[row];
1736  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1737  }
1738  };
1739 
1740  const int num_worker_threads = std::thread::hardware_concurrency();
1741 
1742  if (source_array_ids.size() / num_worker_threads > 10) {
1743  std::vector<std::future<void>> worker_threads;
1744  for (int i = 0; i < num_worker_threads; ++i) {
1745  worker_threads.push_back(std::async(std::launch::async, processor, i));
1746  }
1747 
1748  for (auto& child : worker_threads) {
1749  child.wait();
1750  }
1751  for (auto& child : worker_threads) {
1752  child.get();
1753  }
1754  } else {
1755  processor(0);
1756  }
1757 }
1758 
1759 std::vector<std::string_view> StringDictionary::getStringViews(
1760  const size_t generation) const {
1761  auto timer = DEBUG_TIMER(__func__);
1762  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1763  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1764  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1765  // The CHECK_LE below is currently redundant with the check
1766  // above against MAX_STRCOUNT, however given we iterate using
1767  // int32_t types for efficiency (to match type expected by
1768  // getStringFromStorageFast, check that the # of strings is also
1769  // in the int32_t range in case MAX_STRCOUNT is changed
1770 
1771  // Todo(todd): consider aliasing the max logical type width
1772  // (currently int32_t) throughout StringDictionary
1773  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1774 
1775  std::vector<std::string_view> string_views(num_strings);
1776  // We can bail early if the generation-specified dictionary is empty
1777  if (num_strings == 0) {
1778  return string_views;
1779  }
1780  constexpr int64_t tbb_parallel_threshold{1000};
1781  if (num_strings < tbb_parallel_threshold) {
1782  // Use int32_t to match type expected by getStringFromStorageFast
1783  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1784  string_views[string_idx] = getStringFromStorageFast(string_idx);
1785  }
1786  } else {
1787  constexpr int64_t target_strings_per_thread{1000};
1788  const ThreadInfo thread_info(
1789  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1790  CHECK_GE(thread_info.num_threads, 1L);
1791  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1792 
1793  tbb::task_arena limited_arena(thread_info.num_threads);
1794  limited_arena.execute([&] {
1796  tbb::blocked_range<int64_t>(
1797  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1798  [&](const tbb::blocked_range<int64_t>& r) {
1799  // r should be in range of int32_t per CHECK above
1800  const int32_t start_idx = r.begin();
1801  const int32_t end_idx = r.end();
1802  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1803  string_views[string_idx] = getStringFromStorageFast(string_idx);
1804  }
1805  },
1806  tbb::simple_partitioner());
1807  });
1808  }
1809  return string_views;
1810 }
1811 
1812 std::vector<std::string_view> StringDictionary::getStringViews() const {
1814 }
1815 
1817  const std::shared_ptr<StringDictionary> dest_dict,
1818  StringLookupCallback const& dest_transient_lookup_callback) const {
1819  auto timer = DEBUG_TIMER(__func__);
1820  const size_t num_source_strings = storageEntryCount();
1821  const size_t num_dest_strings = dest_dict->storageEntryCount();
1822  std::vector<int32_t> translated_ids(num_source_strings);
1823 
1824  buildDictionaryTranslationMap(dest_dict.get(),
1825  translated_ids.data(),
1826  num_source_strings,
1827  num_dest_strings,
1828  true, // Just assume true for dest_has_transients as this
1829  // function is only used for testing currently
1830  dest_transient_lookup_callback,
1831  {});
1832  return translated_ids;
1833 }
1834 
1836  const shared::StringDictKey& dest_dict_key,
1837  std::shared_lock<std::shared_mutex>& source_read_lock,
1838  std::shared_lock<std::shared_mutex>& dest_read_lock) {
1839  const bool dicts_are_same = (source_dict_key == dest_dict_key);
1840  const bool source_dict_is_locked_first = (source_dict_key < dest_dict_key);
1841  if (dicts_are_same) {
1842  // dictionaries are same, only take one write lock
1843  dest_read_lock.lock();
1844  } else if (source_dict_is_locked_first) {
1845  source_read_lock.lock();
1846  dest_read_lock.lock();
1847  } else {
1848  dest_read_lock.lock();
1849  source_read_lock.lock();
1850  }
1851 }
1852 
1854  const StringDictionary* dest_dict,
1855  int32_t* translated_ids,
1856  const int64_t source_generation,
1857  const int64_t dest_generation,
1858  const bool dest_has_transients,
1859  StringLookupCallback const& dest_transient_lookup_callback,
1860  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
1861  auto timer = DEBUG_TIMER(__func__);
1862  CHECK_GE(source_generation, 0L);
1863  CHECK_GE(dest_generation, 0L);
1864  const int64_t num_source_strings = source_generation;
1865  const int64_t num_dest_strings = dest_generation;
1866 
1867  // We can bail early if there are no source strings to translate
1868  if (num_source_strings == 0L) {
1869  return 0;
1870  }
1871 
1872  // If here we should should have local dictionaries.
1873  // Note case of transient source dictionaries that aren't
1874  // seen as remote (they have no client_no_timeout_) is covered
1875  // by early bail above on num_source_strings == 0
1876  if (dest_dict->client_no_timeout_) {
1877  throw std::runtime_error(
1878  "Cannot translate between a local source and remote destination dictionary.");
1879  }
1880 
1881  // Sort this/source dict and dest dict on folder_ so we can enforce
1882  // lock ordering and avoid deadlocks
1883  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1884  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1885  std::defer_lock);
1887  getDictKey(), dest_dict->getDictKey(), source_read_lock, dest_read_lock);
1888 
1889  // For both source and destination dictionaries we cap the max
1890  // entries to be translated/translated to at the supplied
1891  // generation arguments, if valid (i.e. >= 0), otherwise just the
1892  // size of each dictionary
1893 
1894  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1895  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1896  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1897 
1898  constexpr int64_t target_strings_per_thread{1000};
1899  const ThreadInfo thread_info(
1900  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1901  CHECK_GE(thread_info.num_threads, 1L);
1902  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1903 
1904  // We use a tbb::task_arena to cap the number of threads, has been
1905  // in other contexts been shown to exhibit better performance when low
1906  // numbers of threads are needed than just letting tbb figure the number of threads,
1907  // but should benchmark in this specific context
1908 
1909  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1910  const bool has_string_ops = string_ops.size();
1911 
1912  tbb::task_arena limited_arena(thread_info.num_threads);
1913  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1914  constexpr bool short_circuit_empty_dictionary_translations{false};
1915  limited_arena.execute([&] {
1916  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1918  tbb::blocked_range<int32_t>(
1919  0,
1920  num_source_strings,
1921  thread_info.num_elems_per_thread /* tbb grain_size */),
1922  [&](const tbb::blocked_range<int32_t>& r) {
1923  const int32_t start_idx = r.begin();
1924  const int32_t end_idx = r.end();
1925  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1926  translated_ids[string_idx] = INVALID_STR_ID;
1927  }
1928  },
1929  tbb::simple_partitioner());
1930  num_strings_not_translated_per_thread[0] += num_source_strings;
1931  } else {
1932  // The below logic, by executing low-level private variable accesses on both
1933  // dictionaries, is less clean than a previous variant that simply called
1934  // `getStringViews` from the source dictionary and then called `getBulk` on the
1935  // destination dictionary, but this version gets significantly better performance
1936  // (~2X), likely due to eliminating the overhead of writing out the string views and
1937  // then reading them back in (along with the associated cache misses)
1939  tbb::blocked_range<int32_t>(
1940  0,
1941  num_source_strings,
1942  thread_info.num_elems_per_thread /* tbb grain_size */),
1943  [&](const tbb::blocked_range<int32_t>& r) {
1944  const int32_t start_idx = r.begin();
1945  const int32_t end_idx = r.end();
1946  size_t num_strings_not_translated = 0;
1947  std::string string_ops_storage; // Needs to be thread local to back
1948  // string_view returned by string_ops()
1949  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1950  ++source_string_id) {
1951  const std::string_view source_str =
1952  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1953  string_ops_storage)
1954  : getStringFromStorageFast(source_string_id);
1955 
1956  if (source_str.empty()) {
1957  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1958  continue;
1959  }
1960  // Get the hash from this/the source dictionary's cache, as the function
1961  // will be the same for the dest_dict, sparing us having to recompute it
1962 
1963  // Todo(todd): Remove option to turn string hash cache off or at least
1964  // make a constexpr to avoid these branches when we expect it to be always
1965  // on going forward
1966  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1967  ? hash_cache_[source_string_id]
1968  : hash_string(source_str);
1969  const uint32_t hash_bucket = dest_dict->computeBucket(
1970  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1971  const auto translated_string_id =
1972  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
1973  translated_ids[source_string_id] = translated_string_id;
1974 
1975  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
1976  translated_string_id >= num_dest_strings) {
1977  if (dest_has_transients) {
1978  num_strings_not_translated +=
1979  dest_transient_lookup_callback(source_str, source_string_id);
1980  } else {
1981  num_strings_not_translated++;
1982  }
1983  continue;
1984  }
1985  }
1986  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
1987  num_strings_not_translated_per_thread[tbb_thread_idx] +=
1988  num_strings_not_translated;
1989  },
1990  tbb::simple_partitioner());
1991  }
1992  });
1993  size_t total_num_strings_not_translated = 0;
1994  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
1995  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
1996  }
1997  return total_num_strings_not_translated;
1998 }
1999 
2001  Datum* translated_ids,
2002  const int64_t source_generation,
2003  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
2004  auto timer = DEBUG_TIMER(__func__);
2005  CHECK_GE(source_generation, 0L);
2006  CHECK_GT(string_op_infos.size(), 0UL);
2007  CHECK(!string_op_infos.back().getReturnType().is_string());
2008  const int64_t num_source_strings = source_generation;
2009 
2010  // We can bail early if there are no source strings to translate
2011  if (num_source_strings == 0L) {
2012  return;
2013  }
2014 
2015  // If here we should should have a local dictionary
2016  // Note case of transient source dictionaries that aren't
2017  // seen as remote (they have no client_no_timeout_) is covered
2018  // by early bail above on num_source_strings == 0
2019 
2020  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_);
2021 
2022  // For source dictionary we cap the number of entries
2023  // to be translated/translated to at the supplied
2024  // generation arguments, if valid (i.e. >= 0), otherwise
2025  // just the size of each dictionary
2026 
2027  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
2028 
2029  constexpr int64_t target_strings_per_thread{1000};
2030  const ThreadInfo thread_info(
2031  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
2032  CHECK_GE(thread_info.num_threads, 1L);
2033  CHECK_GE(thread_info.num_elems_per_thread, 1L);
2034 
2035  // We use a tbb::task_arena to cap the number of threads, has been
2036  // in other contexts been shown to exhibit better performance when low
2037  // numbers of threads are needed than just letting tbb figure the number of threads,
2038  // but should benchmark in this specific context
2039 
2040  const StringOps_Namespace::StringOps string_ops(string_op_infos);
2041  CHECK_GT(string_ops.size(), 0UL);
2042 
2043  tbb::task_arena limited_arena(thread_info.num_threads);
2044  // The below logic, by executing low-level private variable accesses on both
2045  // dictionaries, is less clean than a previous variant that simply called
2046  // `getStringViews` from the source dictionary and then called `getBulk` on the
2047  // destination dictionary, but this version gets significantly better performance
2048  // (~2X), likely due to eliminating the overhead of writing out the string views and
2049  // then reading them back in (along with the associated cache misses)
2050  limited_arena.execute([&] {
2052  tbb::blocked_range<int32_t>(
2053  0, num_source_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
2054  [&](const tbb::blocked_range<int32_t>& r) {
2055  const int32_t start_idx = r.begin();
2056  const int32_t end_idx = r.end();
2057  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2058  ++source_string_id) {
2059  const std::string source_str =
2060  std::string(getStringFromStorageFast(source_string_id));
2061  translated_ids[source_string_id] = string_ops.numericEval(source_str);
2062  }
2063  });
2064  });
2065 }
2066 
2067 void translate_string_ids(std::vector<int32_t>& dest_ids,
2068  const LeafHostInfo& dict_server_host,
2069  const shared::StringDictKey& dest_dict_key,
2070  const std::vector<int32_t>& source_ids,
2071  const shared::StringDictKey& source_dict_key,
2072  const int32_t dest_generation) {
2073  shared::StringDictKey temp_dict_key(-1, -1);
2074  StringDictionaryClient string_client(
2075  dict_server_host, {temp_dict_key.db_id, temp_dict_key.dict_id}, false);
2076  string_client.translate_string_ids(dest_ids,
2077  {dest_dict_key.db_id, dest_dict_key.dict_id},
2078  source_ids,
2079  {source_dict_key.db_id, source_dict_key.dict_id},
2080  dest_generation);
2081 }
2082 
2084  return string_id_string_dict_hash_table_.size() * sizeof(int32_t) +
2085  hash_cache_.size() * sizeof(string_dict_hash_t) +
2086  sorted_cache.size() * sizeof(int32_t) + like_cache_size_ + regex_cache_size_ +
2088 }
void throw_string_too_long_error(std::string_view str, const shared::StringDictKey &dict_key)
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
void increaseHashTableCapacity() noexcept
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void operator()(std::string_view const, int32_t const string_id) override
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
client_no_timeout_(new StringDictionaryClient(host,{dict_key.db_id, dict_key.dict_id}, false))
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:330
std::vector< std::string > copyStrings() const
void buildDictionaryNumericTranslationMap(Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
uint64_t off
const shared::StringDictKey & getDictKey() const noexcept
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:285
StringDictionary(const shared::StringDictKey &dict_key, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
#define UNREACHABLE()
Definition: Logger.h:338
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
Constants for Builtin SQL Types supported by HEAVY.AI.
std::string offsets_path_
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::unordered_map< std::string, int32_t > map_
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:305
int32_t getOrAdd(const std::string &str) noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::string to_string(char const *&&v)
size_t computeCacheSize() const
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:302
constexpr double a
Definition: Utm.h:32
int32_t getIdOfString(const String &) const
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
std::string_view getStringView(int32_t string_id) const
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
int64_t num_threads
Definition: ThreadInfo.h:22
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_NE(x, y)
Definition: Logger.h:302
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:244
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:41
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
int32_t getUnlocked(const std::string_view sv) const noexcept
void appendToStorage(const String str) noexcept
int checked_open(const char *path, const bool recover)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
const shared::StringDictKey dict_key_
std::unordered_map< std::string, int32_t > moveMap()
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const shared::StringDictKey &dest_dict_key, const std::vector< int32_t > &source_ids, const shared::StringDictKey &source_dict_key, const int32_t dest_generation)
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303
void throw_encoding_error(std::string_view str, const shared::StringDictKey &dict_key)
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
#define CHECK_LE(x, y)
Definition: Logger.h:304
void eachStringSerially(int64_t const generation, StringCallback &) const
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:57
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void update_leaf(const LeafHostInfo &host_info)
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
bool g_cache_string_hash
std::vector< std::string_view > getStringViews() const
int msync(void *addr, size_t length, bool async)
Definition: heavyai_fs.cpp:57
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
ThreadId thread_id()
Definition: Logger.cpp:877
int fsync(int fd)
Definition: heavyai_fs.cpp:62
std::function< int32_t(std::string const &)> makeLambdaStringToId() const
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
const uint64_t round_up_p2(const uint64_t num)
std::string_view getStringViewUnlocked(int32_t string_id) const noexcept
std::vector< int32_t > string_id_string_dict_hash_table_
ThreadInfo(const int64_t max_thread_count, const int64_t num_elems, const int64_t target_elems_per_thread)
void sortCache(std::vector< int32_t > &cache)
std::string_view getStringViewChecked(const int string_id) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
std::function< bool(std::string_view, int32_t string_id)> StringLookupCallback
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
void close(const int fd)
Definition: heavyai_fs.cpp:70
constexpr double n
Definition: Utm.h:38
int cpu_threads()
Definition: thread_count.h:25
int get_page_size()
Definition: heavyai_fs.cpp:29
Definition: Datum.h:69
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:39
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
static unsigned transientIdToIndex(int32_t const id)
void operator()(std::string const &str, int32_t const string_id) override
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:388
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:255