OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Shared/DatumFetchers.h"
19 #include "StringOps/StringOps.h"
20 
21 #include <tbb/parallel_for.h>
22 #include <tbb/task_arena.h>
23 #include <algorithm>
24 #include <boost/filesystem/operations.hpp>
25 #include <boost/filesystem/path.hpp>
26 #include <boost/sort/spreadsort/string_sort.hpp>
27 #include <functional>
28 #include <future>
29 #include <iostream>
30 #include <string_view>
31 #include <thread>
32 #include <type_traits>
33 
34 // TODO(adb): fixup
35 #ifdef _WIN32
36 #include <fcntl.h>
37 #include <io.h>
38 #else
39 #include <sys/fcntl.h>
40 #endif
41 
42 #include "Logger/Logger.h"
43 #include "OSDependent/heavyai_fs.h"
44 #include "Shared/sqltypes.h"
45 #include "Shared/thread_count.h"
46 #include "StringDictionaryClient.h"
47 #include "Utils/Regexp.h"
48 #include "Utils/StringLike.h"
49 
50 #include "LeafHostInfo.h"
51 
53 
54 namespace {
55 
57 
58 int checked_open(const char* path, const bool recover) {
59  auto fd = heavyai::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
60  if (fd > 0) {
61  return fd;
62  }
63  auto err = std::string("Dictionary path ") + std::string(path) +
64  std::string(" does not exist.");
65  LOG(ERROR) << err;
66  throw DictPayloadUnavailable(err);
67 }
68 
69 const uint64_t round_up_p2(const uint64_t num) {
70  uint64_t in = num;
71  in--;
72  in |= in >> 1;
73  in |= in >> 2;
74  in |= in >> 4;
75  in |= in >> 8;
76  in |= in >> 16;
77  in++;
78  // TODO MAT deal with case where filesize has been increased but reality is
79  // we are constrained to 2^31.
80  // In that situation this calculation will wrap to zero
81  if (in == 0 || (in > (UINT32_MAX))) {
82  in = UINT32_MAX;
83  }
84  return in;
85 }
86 
87 string_dict_hash_t hash_string(const std::string_view& str) {
88  string_dict_hash_t str_hash = 1;
89  // rely on fact that unsigned overflow is defined and wraps
90  for (size_t i = 0; i < str.size(); ++i) {
91  str_hash = str_hash * 997 + str[i];
92  }
93  return str_hash;
94 }
95 
96 struct ThreadInfo {
97  int64_t num_threads{0};
99 
100  ThreadInfo(const int64_t max_thread_count,
101  const int64_t num_elems,
102  const int64_t target_elems_per_thread) {
103  num_threads =
104  std::min(std::max(max_thread_count, int64_t(1)),
105  ((num_elems + target_elems_per_thread - 1) / target_elems_per_thread));
107  std::max((num_elems + num_threads - 1) / num_threads, int64_t(1));
108  }
109 };
110 
111 } // namespace
112 
114 constexpr int32_t StringDictionary::INVALID_STR_ID;
115 constexpr size_t StringDictionary::MAX_STRLEN;
116 constexpr size_t StringDictionary::MAX_STRCOUNT;
117 
119  const std::string& folder,
120  const bool isTemp,
121  const bool recover,
122  const bool materializeHashes,
123  size_t initial_capacity)
124  : dict_ref_(dict_ref)
125  , folder_(folder)
126  , str_count_(0)
127  , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
128  , hash_cache_(initial_capacity)
129  , isTemp_(isTemp)
130  , materialize_hashes_(materializeHashes)
131  , payload_fd_(-1)
132  , offset_fd_(-1)
133  , offset_map_(nullptr)
134  , payload_map_(nullptr)
135  , offset_file_size_(0)
136  , payload_file_size_(0)
137  , payload_file_off_(0)
138  , strings_cache_(nullptr) {
139  if (!isTemp && folder.empty()) {
140  return;
141  }
142 
143  // initial capacity must be a power of two for efficient bucket computation
144  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
145  if (!isTemp_) {
146  boost::filesystem::path storage_path(folder);
147  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
148  const auto payload_path =
149  (storage_path / boost::filesystem::path("DictPayload")).string();
150  payload_fd_ = checked_open(payload_path.c_str(), recover);
151  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
154  }
155  bool storage_is_empty = false;
156  if (payload_file_size_ == 0) {
157  storage_is_empty = true;
159  }
160  if (offset_file_size_ == 0) {
162  }
163  if (!isTemp_) { // we never mmap or recover temp dictionaries
164  payload_map_ =
165  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
166  offset_map_ = reinterpret_cast<StringIdxEntry*>(
168  if (recover) {
169  const size_t bytes = heavyai::file_size(offset_fd_);
170  if (bytes % sizeof(StringIdxEntry) != 0) {
171  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
172  }
173  const uint64_t str_count =
174  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
175  collisions_ = 0;
176  // at this point we know the size of the StringDict we need to load
177  // so lets reallocate the vector to the correct size
178  const uint64_t max_entries =
179  std::max(round_up_p2(str_count * 2 + 1),
180  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
181  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
182  string_id_string_dict_hash_table_.swap(new_str_ids);
183  if (materialize_hashes_) {
184  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
185  hash_cache_.swap(new_hash_cache);
186  }
187  // Bail early if we know we don't have strings to add (i.e. a new or empty
188  // dictionary)
189  if (str_count == 0) {
190  return;
191  }
192 
193  unsigned string_id = 0;
194  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
195 
196  uint32_t thread_inits = 0;
197  const auto thread_count = std::thread::hardware_concurrency();
198  const uint32_t items_per_thread = std::max<uint32_t>(
199  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
200  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
201  dictionary_futures;
202  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
203  dictionary_futures.emplace_back(std::async(
204  std::launch::async, [string_id, str_count, items_per_thread, this] {
205  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
206  for (uint32_t curr_id = string_id;
207  curr_id < string_id + items_per_thread && curr_id < str_count;
208  curr_id++) {
209  const auto recovered = getStringFromStorage(curr_id);
210  if (recovered.canary) {
211  // hit the canary, recovery finished
212  break;
213  } else {
214  std::string_view temp(recovered.c_str_ptr, recovered.size);
215  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
216  }
217  }
218  return hashVec;
219  }));
220  thread_inits++;
221  if (thread_inits % thread_count == 0) {
222  processDictionaryFutures(dictionary_futures);
223  }
224  }
225  // gather last few threads
226  if (dictionary_futures.size() != 0) {
227  processDictionaryFutures(dictionary_futures);
228  }
229  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
230  << " Hash table size: " << string_id_string_dict_hash_table_.size()
231  << " Fill rate: "
232  << static_cast<double>(str_count_) * 100.0 /
234  << "% Collisions: " << collisions_;
235  }
236  }
237 }
238 
239 namespace {
241  std::unordered_map<std::string, int32_t> map_;
242 
243  public:
244  void operator()(std::string const& str, int32_t const string_id) override {
245  auto const emplaced = map_.emplace(str, string_id);
246  CHECK(emplaced.second) << "str(" << str << ") string_id(" << string_id << ')';
247  }
248  void operator()(std::string_view const, int32_t const string_id) override {
249  UNREACHABLE() << "MapMaker must be called with a std::string.";
250  }
251  std::unordered_map<std::string, int32_t> moveMap() { return std::move(map_); }
252 };
253 } // namespace
254 
255 std::function<int32_t(std::string const&)> StringDictionary::makeLambdaStringToId()
256  const {
257  CHECK(isClient());
258  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
259  MapMaker map_maker;
260  eachStringSerially(big_gen, map_maker);
261  return [map{map_maker.moveMap()}](std::string const& str) {
262  auto const itr = map.find(str);
263  return itr == map.cend() ? INVALID_STR_ID : itr->second;
264  };
265 }
266 
267 // Call serial_callback for each (string/_view, string_id). Must be called serially.
268 void StringDictionary::eachStringSerially(int64_t const generation,
269  StringCallback& serial_callback) const {
270  if (isClient()) {
271  // copyStrings() is not supported when isClient().
272  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
273  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
274  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
275  for (unsigned id = 0; id < n; ++id) {
276  {
277  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
278  client_->get_string(str, id);
279  }
280  serial_callback(str, id);
281  }
282  } else {
283  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
284  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
285  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
286  for (unsigned id = 0; id < n; ++id) {
287  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
288  }
289  }
290 }
291 
293  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
294  dictionary_futures) {
295  for (auto& dictionary_future : dictionary_futures) {
296  dictionary_future.wait();
297  const auto hashVec = dictionary_future.get();
298  for (const auto& hash : hashVec) {
299  const uint32_t bucket =
301  payload_file_off_ += hash.second;
302  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
303  if (materialize_hashes_) {
304  hash_cache_[str_count_] = hash.first;
305  }
306  ++str_count_;
307  }
308  }
309  dictionary_futures.clear();
310 }
311 
312 int32_t StringDictionary::getDbId() const noexcept {
313  return dict_ref_.dbId;
314 }
315 
316 int32_t StringDictionary::getDictId() const noexcept {
317  return dict_ref_.dictId;
318 }
319 
327 size_t StringDictionary::getNumStringsFromStorage(const size_t storage_slots) const
328  noexcept {
329  if (storage_slots == 0) {
330  return 0;
331  }
332  // Must use signed integers since final binary search step can wrap to max size_t value
333  // if dictionary is empty
334  int64_t min_bound = 0;
335  int64_t max_bound = storage_slots - 1;
336  int64_t guess{0};
337  while (min_bound <= max_bound) {
338  guess = (max_bound + min_bound) / 2;
339  CHECK_GE(guess, 0);
340  if (getStringFromStorage(guess).canary) {
341  max_bound = guess - 1;
342  } else {
343  min_bound = guess + 1;
344  }
345  }
346  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
347  return guess + (min_bound > guess ? 1 : 0);
348 }
349 
351  : dict_ref_(dict_ref)
352  , folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
353  std::to_string(dict_ref.dictId))
354  , strings_cache_(nullptr)
355  , client_(new StringDictionaryClient(host, dict_ref, true))
356  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
357 
359  free(CANARY_BUFFER);
360  if (isClient()) {
361  return;
362  }
363  if (payload_map_) {
364  if (!isTemp_) {
368  CHECK_GE(payload_fd_, 0);
370  CHECK_GE(offset_fd_, 0);
372  } else {
374  free(payload_map_);
375  free(offset_map_);
376  }
377  }
378 }
379 
381 
382 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
383  if (isClient()) {
384  std::vector<int32_t> string_ids;
385  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
386  CHECK_EQ(size_t(1), string_ids.size());
387  return string_ids.front();
388  }
389  return getOrAddImpl(str);
390 }
391 
392 namespace {
393 
394 template <class T>
395 void throw_encoding_error(std::string_view str, const DictRef& dict_ref) {
396  std::ostringstream oss;
397  oss << "The text encoded column using dictionary " << dict_ref.toString()
398  << " has exceeded it's limit of " << sizeof(T) * 8 << " bits ("
399  << static_cast<size_t>(max_valid_int_value<T>() + 1) << " unique values) "
400  << "while attempting to add the new string '" << str << "'. ";
401 
402  if (sizeof(T) < 4) {
403  // Todo: Implement automatic type widening for dictionary-encoded text
404  // columns/all fixed length columm types (at least if not defined
405  // with fixed encoding size), or short of that, ALTER TABLE
406  // COLUMN TYPE to at least allow the user to do this manually
407  // without re-creating the table
408 
409  oss << "To load more data, please re-create the table with "
410  << "this column as type TEXT ENCODING DICT(" << sizeof(T) * 2 * 8 << ") ";
411  if (sizeof(T) == 1) {
412  oss << "or TEXT ENCODING DICT(32) ";
413  }
414  oss << "and reload your data.";
415  } else {
416  // Todo: Implement TEXT ENCODING DICT(64) type which should essentially
417  // preclude overflows.
418  oss << "Currently dictionary-encoded text columns support a maximum of "
420  << " strings. Consider recreating the table with "
421  << "this column as type TEXT ENCODING NONE and reloading your data.";
422  }
423  LOG(ERROR) << oss.str();
424  throw std::runtime_error(oss.str());
425 }
426 
427 void throw_string_too_long_error(std::string_view str, const DictRef& dict_ref) {
428  std::ostringstream oss;
429  oss << "The string '" << str << " could not be inserted into the dictionary "
430  << dict_ref.toString() << " because it exceeded the maximum allowable "
431  << "length of " << StringDictionary::MAX_STRLEN << " characters (string was "
432  << str.size() << " characters).";
433  LOG(ERROR) << oss.str();
434  throw std::runtime_error(oss.str());
435 }
436 
437 } // namespace
438 
439 template <class String>
441  const std::vector<std::vector<String>>& string_array_vec,
442  std::vector<std::vector<int32_t>>& ids_array_vec) {
443  if (client_no_timeout_) {
444  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
445  return;
446  }
447 
448  ids_array_vec.resize(string_array_vec.size());
449  for (size_t i = 0; i < string_array_vec.size(); i++) {
450  auto& strings = string_array_vec[i];
451  auto& ids = ids_array_vec[i];
452  ids.resize(strings.size());
453  getOrAddBulk(strings, &ids[0]);
454  }
455 }
456 
458  const std::vector<std::vector<std::string>>& string_array_vec,
459  std::vector<std::vector<int32_t>>& ids_array_vec);
460 
462  const std::vector<std::vector<std::string_view>>& string_array_vec,
463  std::vector<std::vector<int32_t>>& ids_array_vec);
464 
470 template <class String>
471 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
472  std::vector<string_dict_hash_t>& hashes) const
473  noexcept {
474  CHECK_EQ(string_vec.size(), hashes.size());
475 
476  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
477  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
478  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
479  if (string_vec[curr_id].empty()) {
480  continue;
481  }
482  hashes[curr_id] = hash_string(string_vec[curr_id]);
483  }
484  });
485 }
486 
487 template <class T, class String>
488 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
489  T* encoded_vec) const {
490  return getBulk(string_vec, encoded_vec, -1L /* generation */);
491 }
492 
493 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
494  uint8_t* encoded_vec) const;
495 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
496  uint16_t* encoded_vec) const;
497 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
498  int32_t* encoded_vec) const;
499 
500 template <class T, class String>
501 size_t StringDictionary::getBulk(const std::vector<String>& string_vec,
502  T* encoded_vec,
503  const int64_t generation) const {
504  constexpr int64_t target_strings_per_thread{1000};
505  const int64_t num_lookup_strings = string_vec.size();
506  if (num_lookup_strings == 0) {
507  return 0;
508  }
509 
510  const ThreadInfo thread_info(
511  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
512  CHECK_GE(thread_info.num_threads, 1L);
513  CHECK_GE(thread_info.num_elems_per_thread, 1L);
514 
515  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
516 
517  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
518  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
519  const bool dictionary_is_empty = (num_dict_strings == 0);
520  if (dictionary_is_empty) {
521  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
522  [&](const tbb::blocked_range<int64_t>& r) {
523  const int64_t start_idx = r.begin();
524  const int64_t end_idx = r.end();
525  for (int64_t string_idx = start_idx; string_idx < end_idx;
526  ++string_idx) {
527  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
528  }
529  });
530  return num_lookup_strings;
531  }
532  // If we're here the generation-capped dictionary has strings in it
533  // that we need to look up against
534 
535  tbb::task_arena limited_arena(thread_info.num_threads);
536  limited_arena.execute([&] {
537  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
539  tbb::blocked_range<int64_t>(
540  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
541  [&](const tbb::blocked_range<int64_t>& r) {
542  const int64_t start_idx = r.begin();
543  const int64_t end_idx = r.end();
544  size_t num_strings_not_found = 0;
545  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
546  const auto& input_string = string_vec[string_idx];
547  if (input_string.empty()) {
548  encoded_vec[string_idx] = inline_int_null_value<T>();
549  continue;
550  }
551  if (input_string.size() > StringDictionary::MAX_STRLEN) {
552  throw_string_too_long_error(input_string, dict_ref_);
553  }
554  const string_dict_hash_t input_string_hash = hash_string(input_string);
555  uint32_t hash_bucket = computeBucket(
556  input_string_hash, input_string, string_id_string_dict_hash_table_);
557  // Will either be legit id or INVALID_STR_ID
558  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
559  if (string_id == StringDictionary::INVALID_STR_ID ||
560  string_id >= num_dict_strings) {
561  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
562  num_strings_not_found++;
563  continue;
564  }
565  encoded_vec[string_idx] = string_id;
566  }
567  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
568  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
569  },
570  tbb::simple_partitioner());
571  });
572 
573  size_t num_strings_not_found = 0;
574  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
575  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
576  }
577  return num_strings_not_found;
578 }
579 
580 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
581  uint8_t* encoded_vec,
582  const int64_t generation) const;
583 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
584  uint16_t* encoded_vec,
585  const int64_t generation) const;
586 template size_t StringDictionary::getBulk(const std::vector<std::string>& string_vec,
587  int32_t* encoded_vec,
588  const int64_t generation) const;
589 
590 template <class T, class String>
591 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
592  T* output_string_ids) {
594  getOrAddBulkParallel(input_strings, output_string_ids);
595  return;
596  }
597  // Single-thread path.
598  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
599 
600  const size_t initial_str_count = str_count_;
601  size_t idx = 0;
602  for (const auto& input_string : input_strings) {
603  if (input_string.empty()) {
604  output_string_ids[idx++] = inline_int_null_value<T>();
605  continue;
606  }
607  CHECK(input_string.size() <= MAX_STRLEN);
608 
609  const string_dict_hash_t input_string_hash = hash_string(input_string);
610  uint32_t hash_bucket =
611  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
613  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
614  continue;
615  }
616  // need to add record to dictionary
617  // check there is room
618  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
619  throw_encoding_error<T>(input_string, dict_ref_);
620  }
622  << "Maximum number (" << str_count_
623  << ") of Dictionary encoded Strings reached for this column, offset path "
624  "for column is "
625  << offsets_path_;
626  if (fillRateIsHigh(str_count_)) {
627  // resize when more than 50% is full
629  hash_bucket = computeBucket(
630  input_string_hash, input_string, string_id_string_dict_hash_table_);
631  }
632  appendToStorage(input_string);
633 
634  if (materialize_hashes_) {
635  hash_cache_[str_count_] = input_string_hash;
636  }
637  const int32_t string_id = static_cast<int32_t>(str_count_);
638  string_id_string_dict_hash_table_[hash_bucket] = string_id;
639  output_string_ids[idx++] = string_id;
640  ++str_count_;
641  }
642  const size_t num_strings_added = str_count_ - initial_str_count;
643  if (num_strings_added > 0) {
645  }
646 }
647 
648 template <class T, class String>
649 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
650  T* output_string_ids) {
651  // Compute hashes of the input strings up front, and in parallel,
652  // as the string hashing does not need to be behind the subsequent write_lock
653  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
654  hashStrings(input_strings, input_strings_hashes);
655 
656  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
657  size_t shadow_str_count =
658  str_count_; // Need to shadow str_count_ now with bulk add methods
659  const size_t storage_high_water_mark = shadow_str_count;
660  std::vector<size_t> string_memory_ids;
661  size_t sum_new_string_lengths = 0;
662  string_memory_ids.reserve(input_strings.size());
663  size_t input_string_idx{0};
664  for (const auto& input_string : input_strings) {
665  // Currently we make empty strings null
666  if (input_string.empty()) {
667  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
668  continue;
669  }
670  // TODO: Recover gracefully if an input string is too long
671  CHECK(input_string.size() <= MAX_STRLEN);
672 
673  if (fillRateIsHigh(shadow_str_count)) {
674  // resize when more than 50% is full
676  storage_high_water_mark,
677  input_strings,
678  string_memory_ids,
679  input_strings_hashes);
680  }
681  // Compute the hash for this input_string
682  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
683 
684  const uint32_t hash_bucket =
685  computeBucketFromStorageAndMemory(input_string_hash,
686  input_string,
688  storage_high_water_mark,
689  input_strings,
690  string_memory_ids);
691 
692  // If the hash bucket is not empty, that is our string id
693  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
694  // bucket string are equal)
696  output_string_ids[input_string_idx++] =
698  continue;
699  }
700  // Did not find string, so need to add record to dictionary
701  // First check there is room
702  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
703  throw_encoding_error<T>(input_string, dict_ref_);
704  }
705  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
706  << "Maximum number (" << shadow_str_count
707  << ") of Dictionary encoded Strings reached for this column, offset path "
708  "for column is "
709  << offsets_path_;
710 
711  string_memory_ids.push_back(input_string_idx);
712  sum_new_string_lengths += input_string.size();
713  string_id_string_dict_hash_table_[hash_bucket] =
714  static_cast<int32_t>(shadow_str_count);
715  if (materialize_hashes_) {
716  hash_cache_[shadow_str_count] = input_string_hash;
717  }
718  output_string_ids[input_string_idx++] = shadow_str_count++;
719  }
720  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
721  const size_t num_strings_added = shadow_str_count - str_count_;
722  str_count_ = shadow_str_count;
723  if (num_strings_added > 0) {
725  }
726 }
727 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
728  uint8_t* encoded_vec);
729 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
730  uint16_t* encoded_vec);
731 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
732  int32_t* encoded_vec);
733 
734 template void StringDictionary::getOrAddBulk(
735  const std::vector<std::string_view>& string_vec,
736  uint8_t* encoded_vec);
737 template void StringDictionary::getOrAddBulk(
738  const std::vector<std::string_view>& string_vec,
739  uint16_t* encoded_vec);
740 template void StringDictionary::getOrAddBulk(
741  const std::vector<std::string_view>& string_vec,
742  int32_t* encoded_vec);
743 
744 template <class String>
745 int32_t StringDictionary::getIdOfString(const String& str) const {
746  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
747  if (isClient()) {
748  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
749  return client_->get(str);
750  } else {
751  return client_->get(std::string(str));
752  }
753  }
754  return getUnlocked(str);
755 }
756 
757 template int32_t StringDictionary::getIdOfString(const std::string&) const;
758 template int32_t StringDictionary::getIdOfString(const std::string_view&) const;
759 
760 int32_t StringDictionary::getUnlocked(const std::string_view sv) const noexcept {
761  const string_dict_hash_t hash = hash_string(sv);
762  auto str_id = string_id_string_dict_hash_table_[computeBucket(
763  hash, sv, string_id_string_dict_hash_table_)];
764  return str_id;
765 }
766 
767 std::string StringDictionary::getString(int32_t string_id) const {
768  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
769  if (isClient()) {
770  std::string ret;
771  client_->get_string(ret, string_id);
772  return ret;
773  }
774  return getStringUnlocked(string_id);
775 }
776 
777 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
778  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
779  return getStringChecked(string_id);
780 }
781 
782 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
783  noexcept {
784  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
785  CHECK(!isClient());
786  CHECK_LE(0, string_id);
787  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
788  return getStringBytesChecked(string_id);
789 }
790 
792  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
793  if (isClient()) {
794  return client_->storage_entry_count();
795  }
796  return str_count_;
797 }
798 
799 namespace {
800 
801 bool is_like(const std::string& str,
802  const std::string& pattern,
803  const bool icase,
804  const bool is_simple,
805  const char escape) {
806  return icase
807  ? (is_simple ? string_ilike_simple(
808  str.c_str(), str.size(), pattern.c_str(), pattern.size())
809  : string_ilike(str.c_str(),
810  str.size(),
811  pattern.c_str(),
812  pattern.size(),
813  escape))
814  : (is_simple ? string_like_simple(
815  str.c_str(), str.size(), pattern.c_str(), pattern.size())
816  : string_like(str.c_str(),
817  str.size(),
818  pattern.c_str(),
819  pattern.size(),
820  escape));
821 }
822 
823 } // namespace
824 
825 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
826  const bool icase,
827  const bool is_simple,
828  const char escape,
829  const size_t generation) const {
830  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
831  if (isClient()) {
832  return client_->get_like(pattern, icase, is_simple, escape, generation);
833  }
834  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
835  const auto it = like_cache_.find(cache_key);
836  if (it != like_cache_.end()) {
837  return it->second;
838  }
839  std::vector<int32_t> result;
840  std::vector<std::thread> workers;
841  int worker_count = cpu_threads();
842  CHECK_GT(worker_count, 0);
843  std::vector<std::vector<int32_t>> worker_results(worker_count);
844  CHECK_LE(generation, str_count_);
845  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
846  workers.emplace_back([&worker_results,
847  &pattern,
848  generation,
849  icase,
850  is_simple,
851  escape,
852  worker_idx,
853  worker_count,
854  this]() {
855  for (size_t string_id = worker_idx; string_id < generation;
856  string_id += worker_count) {
857  const auto str = getStringUnlocked(string_id);
858  if (is_like(str, pattern, icase, is_simple, escape)) {
859  worker_results[worker_idx].push_back(string_id);
860  }
861  }
862  });
863  }
864  for (auto& worker : workers) {
865  worker.join();
866  }
867  for (const auto& worker_result : worker_results) {
868  result.insert(result.end(), worker_result.begin(), worker_result.end());
869  }
870  // place result into cache for reuse if similar query
871  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
872 
873  CHECK(it_ok.second);
874 
875  return result;
876 }
877 
878 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
879  std::string comp_operator,
880  size_t generation) {
881  std::vector<int32_t> result;
882  auto eq_id_itr = equal_cache_.find(pattern);
883  int32_t eq_id = MAX_STRLEN + 1;
884  int32_t cur_size = str_count_;
885  if (eq_id_itr != equal_cache_.end()) {
886  auto eq_id = eq_id_itr->second;
887  if (comp_operator == "=") {
888  result.push_back(eq_id);
889  } else {
890  for (int32_t idx = 0; idx <= cur_size; idx++) {
891  if (idx == eq_id) {
892  continue;
893  }
894  result.push_back(idx);
895  }
896  }
897  } else {
898  std::vector<std::thread> workers;
899  int worker_count = cpu_threads();
900  CHECK_GT(worker_count, 0);
901  std::vector<std::vector<int32_t>> worker_results(worker_count);
902  CHECK_LE(generation, str_count_);
903  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
904  workers.emplace_back(
905  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
906  for (size_t string_id = worker_idx; string_id < generation;
907  string_id += worker_count) {
908  const auto str = getStringUnlocked(string_id);
909  if (str == pattern) {
910  worker_results[worker_idx].push_back(string_id);
911  }
912  }
913  });
914  }
915  for (auto& worker : workers) {
916  worker.join();
917  }
918  for (const auto& worker_result : worker_results) {
919  result.insert(result.end(), worker_result.begin(), worker_result.end());
920  }
921  if (result.size() > 0) {
922  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
923  CHECK(it_ok.second);
924  eq_id = result[0];
925  }
926  if (comp_operator == "<>") {
927  for (int32_t idx = 0; idx <= cur_size; idx++) {
928  if (idx == eq_id) {
929  continue;
930  }
931  result.push_back(idx);
932  }
933  }
934  }
935  return result;
936 }
937 
938 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
939  const std::string& comp_operator,
940  const size_t generation) {
941  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
942  if (isClient()) {
943  return client_->get_compare(pattern, comp_operator, generation);
944  }
945  std::vector<int32_t> ret;
946  if (str_count_ == 0) {
947  return ret;
948  }
949  if (sorted_cache.size() < str_count_) {
950  if (comp_operator == "=" || comp_operator == "<>") {
951  return getEquals(pattern, comp_operator, generation);
952  }
953 
955  }
956  auto cache_index = compare_cache_.get(pattern);
957 
958  if (!cache_index) {
959  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
960  const auto cache_itr = std::lower_bound(
961  sorted_cache.begin(),
962  sorted_cache.end(),
963  pattern,
964  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
965  auto a_str = this->getStringFromStorage(a);
966  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
967  });
968 
969  if (cache_itr == sorted_cache.end()) {
970  cache_index->index = sorted_cache.size() - 1;
971  cache_index->diff = 1;
972  } else {
973  const auto cache_str = getStringFromStorage(*cache_itr);
974  if (!string_eq(
975  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
976  cache_index->index = cache_itr - sorted_cache.begin() - 1;
977  cache_index->diff = 1;
978  } else {
979  cache_index->index = cache_itr - sorted_cache.begin();
980  cache_index->diff = 0;
981  }
982  }
983 
984  compare_cache_.put(pattern, cache_index);
985  }
986 
987  // since we have a cache in form of vector of ints which is sorted according to
988  // corresponding strings in the dictionary all we need is the index of the element
989  // which equal to the pattern that we are trying to match or the index of “biggest”
990  // element smaller than the pattern, to perform all the comparison operators over
991  // string. The search function guarantees we have such index so now it is just the
992  // matter to include all the elements in the result vector.
993 
994  // For < operator if the index that we have points to the element which is equal to
995  // the pattern that we are searching for we simply get all the elements less than the
996  // index. If the element pointed by the index is not equal to the pattern we are
997  // comparing with we also need to include that index in result vector, except when the
998  // index points to 0 and the pattern is lesser than the smallest value in the string
999  // dictionary.
1000 
1001  if (comp_operator == "<") {
1002  size_t idx = cache_index->index;
1003  if (cache_index->diff) {
1004  idx = cache_index->index + 1;
1005  if (cache_index->index == 0 && cache_index->diff > 0) {
1006  idx = cache_index->index;
1007  }
1008  }
1009  for (size_t i = 0; i < idx; i++) {
1010  ret.push_back(sorted_cache[i]);
1011  }
1012 
1013  // For <= operator if the index that we have points to the element which is equal to
1014  // the pattern that we are searching for we want to include the element pointed by
1015  // the index in the result set. If the element pointed by the index is not equal to
1016  // the pattern we are comparing with we just want to include all the ids with index
1017  // less than the index that is cached, except when pattern that we are searching for
1018  // is smaller than the smallest string in the dictionary.
1019 
1020  } else if (comp_operator == "<=") {
1021  size_t idx = cache_index->index + 1;
1022  if (cache_index == 0 && cache_index->diff > 0) {
1023  idx = cache_index->index;
1024  }
1025  for (size_t i = 0; i < idx; i++) {
1026  ret.push_back(sorted_cache[i]);
1027  }
1028 
1029  // For > operator we want to get all the elements with index greater than the index
1030  // that we have except, when the pattern we are searching for is lesser than the
1031  // smallest string in the dictionary we also want to include the id of the index
1032  // that we have.
1033 
1034  } else if (comp_operator == ">") {
1035  size_t idx = cache_index->index + 1;
1036  if (cache_index->index == 0 && cache_index->diff > 0) {
1037  idx = cache_index->index;
1038  }
1039  for (size_t i = idx; i < sorted_cache.size(); i++) {
1040  ret.push_back(sorted_cache[i]);
1041  }
1042 
1043  // For >= operator when the indexed element that we have points to element which is
1044  // equal to the pattern we are searching for we want to include that in the result
1045  // vector. If the index that we have does not point to the string which is equal to
1046  // the pattern we are searching we don’t want to include that id into the result
1047  // vector except when the index is 0.
1048 
1049  } else if (comp_operator == ">=") {
1050  size_t idx = cache_index->index;
1051  if (cache_index->diff) {
1052  idx = cache_index->index + 1;
1053  if (cache_index->index == 0 && cache_index->diff > 0) {
1054  idx = cache_index->index;
1055  }
1056  }
1057  for (size_t i = idx; i < sorted_cache.size(); i++) {
1058  ret.push_back(sorted_cache[i]);
1059  }
1060  } else if (comp_operator == "=") {
1061  if (!cache_index->diff) {
1062  ret.push_back(sorted_cache[cache_index->index]);
1063  }
1064 
1065  // For <> operator it is simple matter of not including id of string which is equal
1066  // to pattern we are searching for.
1067  } else if (comp_operator == "<>") {
1068  if (!cache_index->diff) {
1069  size_t idx = cache_index->index;
1070  for (size_t i = 0; i < idx; i++) {
1071  ret.push_back(sorted_cache[i]);
1072  }
1073  ++idx;
1074  for (size_t i = idx; i < sorted_cache.size(); i++) {
1075  ret.push_back(sorted_cache[i]);
1076  }
1077  } else {
1078  for (size_t i = 0; i < sorted_cache.size(); i++) {
1079  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1080  }
1081  }
1082 
1083  } else {
1084  std::runtime_error("Unsupported string comparison operator");
1085  }
1086  return ret;
1087 }
1088 
1089 namespace {
1090 
1091 bool is_regexp_like(const std::string& str,
1092  const std::string& pattern,
1093  const char escape) {
1094  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
1095 }
1096 
1097 } // namespace
1098 
1099 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
1100  const char escape,
1101  const size_t generation) const {
1102  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1103  if (isClient()) {
1104  return client_->get_regexp_like(pattern, escape, generation);
1105  }
1106  const auto cache_key = std::make_pair(pattern, escape);
1107  const auto it = regex_cache_.find(cache_key);
1108  if (it != regex_cache_.end()) {
1109  return it->second;
1110  }
1111  std::vector<int32_t> result;
1112  std::vector<std::thread> workers;
1113  int worker_count = cpu_threads();
1114  CHECK_GT(worker_count, 0);
1115  std::vector<std::vector<int32_t>> worker_results(worker_count);
1116  CHECK_LE(generation, str_count_);
1117  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1118  workers.emplace_back([&worker_results,
1119  &pattern,
1120  generation,
1121  escape,
1122  worker_idx,
1123  worker_count,
1124  this]() {
1125  for (size_t string_id = worker_idx; string_id < generation;
1126  string_id += worker_count) {
1127  const auto str = getStringUnlocked(string_id);
1128  if (is_regexp_like(str, pattern, escape)) {
1129  worker_results[worker_idx].push_back(string_id);
1130  }
1131  }
1132  });
1133  }
1134  for (auto& worker : workers) {
1135  worker.join();
1136  }
1137  for (const auto& worker_result : worker_results) {
1138  result.insert(result.end(), worker_result.begin(), worker_result.end());
1139  }
1140  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1141  CHECK(it_ok.second);
1142 
1143  return result;
1144 }
1145 
1146 std::vector<std::string> StringDictionary::copyStrings() const {
1147  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1148  if (isClient()) {
1149  // TODO(miyu): support remote string dictionary
1150  throw std::runtime_error(
1151  "copying dictionaries from remote server is not supported yet.");
1152  }
1153 
1154  if (strings_cache_) {
1155  return *strings_cache_;
1156  }
1157 
1158  strings_cache_ = std::make_shared<std::vector<std::string>>();
1159  strings_cache_->reserve(str_count_);
1160  const bool multithreaded = str_count_ > 10000;
1161  const auto worker_count =
1162  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1163  CHECK_GT(worker_count, 0UL);
1164  std::vector<std::vector<std::string>> worker_results(worker_count);
1165  auto copy = [this](std::vector<std::string>& str_list,
1166  const size_t start_id,
1167  const size_t end_id) {
1168  CHECK_LE(start_id, end_id);
1169  str_list.reserve(end_id - start_id);
1170  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1171  str_list.push_back(getStringUnlocked(string_id));
1172  }
1173  };
1174  if (multithreaded) {
1175  std::vector<std::future<void>> workers;
1176  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1177  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1178  worker_idx < worker_count && start < str_count_;
1179  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1180  workers.push_back(std::async(
1181  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
1182  }
1183  for (auto& worker : workers) {
1184  worker.get();
1185  }
1186  } else {
1187  CHECK_EQ(worker_results.size(), size_t(1));
1188  copy(worker_results[0], 0, str_count_);
1189  }
1190 
1191  for (const auto& worker_result : worker_results) {
1192  strings_cache_->insert(
1193  strings_cache_->end(), worker_result.begin(), worker_result.end());
1194  }
1195  return *strings_cache_;
1196 }
1197 
1198 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
1199  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1200 }
1201 
1203  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1204  INVALID_STR_ID);
1205 
1206  if (materialize_hashes_) {
1207  for (size_t i = 0; i != str_count_; ++i) {
1208  const string_dict_hash_t hash = hash_cache_[i];
1209  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1210  new_str_ids[bucket] = i;
1211  }
1212  hash_cache_.resize(hash_cache_.size() * 2);
1213  } else {
1214  for (size_t i = 0; i != str_count_; ++i) {
1215  const auto str = getStringChecked(i);
1216  const string_dict_hash_t hash = hash_string(str);
1217  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1218  new_str_ids[bucket] = i;
1219  }
1220  }
1221  string_id_string_dict_hash_table_.swap(new_str_ids);
1222 }
1223 
1224 template <class String>
1226  const size_t str_count, // str_count_ is only persisted strings, so need transient
1227  // shadow count
1228  const size_t storage_high_water_mark,
1229  const std::vector<String>& input_strings,
1230  const std::vector<size_t>& string_memory_ids,
1231  const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1232  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1233  INVALID_STR_ID);
1234  if (materialize_hashes_) {
1235  for (size_t i = 0; i != str_count; ++i) {
1236  const string_dict_hash_t hash = hash_cache_[i];
1237  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1238  new_str_ids[bucket] = i;
1239  }
1240  hash_cache_.resize(hash_cache_.size() * 2);
1241  } else {
1242  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1243  const auto storage_string = getStringChecked(storage_idx);
1244  const string_dict_hash_t hash = hash_string(storage_string);
1245  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1246  new_str_ids[bucket] = storage_idx;
1247  }
1248  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1249  const size_t string_memory_id = string_memory_ids[memory_idx];
1250  const uint32_t bucket = computeUniqueBucketWithHash(
1251  input_strings_hashes[string_memory_id], new_str_ids);
1252  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1253  }
1254  }
1255  string_id_string_dict_hash_table_.swap(new_str_ids);
1256 }
1257 
1258 int32_t StringDictionary::getOrAddImpl(const std::string_view& str) noexcept {
1259  // @TODO(wei) treat empty string as NULL for now
1260  if (str.size() == 0) {
1261  return inline_int_null_value<int32_t>();
1262  }
1263  CHECK(str.size() <= MAX_STRLEN);
1264  const string_dict_hash_t hash = hash_string(str);
1265  {
1266  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1267  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1268  if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1269  return string_id_string_dict_hash_table_[bucket];
1270  }
1271  }
1272  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1273  if (fillRateIsHigh(str_count_)) {
1274  // resize when more than 50% is full
1275  increaseHashTableCapacity();
1276  }
1277  // need to recalculate the bucket in case it changed before
1278  // we got the lock
1279  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1280  if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1281  CHECK_LT(str_count_, MAX_STRCOUNT)
1282  << "Maximum number (" << str_count_
1283  << ") of Dictionary encoded Strings reached for this column, offset path "
1284  "for column is "
1285  << offsets_path_;
1286  appendToStorage(str);
1287  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1288  if (materialize_hashes_) {
1289  hash_cache_[str_count_] = hash;
1290  }
1291  ++str_count_;
1292  invalidateInvertedIndex();
1293  }
1294  return string_id_string_dict_hash_table_[bucket];
1295 }
1296 
1297 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1298  const auto str_canary = getStringFromStorage(string_id);
1299  CHECK(!str_canary.canary);
1300  return std::string(str_canary.c_str_ptr, str_canary.size);
1301 }
1302 
1304  const int string_id) const noexcept {
1305  const auto str_canary = getStringFromStorage(string_id);
1306  CHECK(!str_canary.canary);
1307  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1308 }
1309 
1310 template <class String>
1312  const string_dict_hash_t hash,
1313  const String& input_string,
1314  const std::vector<int32_t>& string_id_string_dict_hash_table) const noexcept {
1315  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1316  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1317  while (true) {
1318  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1319  if (candidate_string_id ==
1320  INVALID_STR_ID) { // In this case it means the slot is available for use
1321  break;
1322  }
1323  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1324  !materialize_hashes_) {
1325  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1326  if (input_string.size() == candidate_string.size() &&
1327  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1328  // found the string
1329  break;
1330  }
1331  }
1332  // wrap around
1333  if (++bucket == string_dict_hash_table_size) {
1334  bucket = 0;
1335  }
1336  }
1337  return bucket;
1338 }
1339 
1340 template <class String>
1342  const string_dict_hash_t input_string_hash,
1343  const String& input_string,
1344  const std::vector<int32_t>& string_id_string_dict_hash_table,
1345  const size_t storage_high_water_mark,
1346  const std::vector<String>& input_strings,
1347  const std::vector<size_t>& string_memory_ids) const noexcept {
1348  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1349  while (true) {
1350  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1351  if (candidate_string_id ==
1352  INVALID_STR_ID) { // In this case it means the slot is available for use
1353  break;
1354  }
1355  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1356  if (candidate_string_id > 0 &&
1357  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1358  // The candidate string is not in storage yet but in our string_memory_ids temp
1359  // buffer
1360  size_t memory_offset =
1361  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1362  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1363  if (input_string.size() == candidate_string.size() &&
1364  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1365  // found the string in the temp memory buffer
1366  break;
1367  }
1368  } else {
1369  // The candidate string is in storage, need to fetch it for comparison
1370  const auto candidate_storage_string =
1371  getStringFromStorageFast(candidate_string_id);
1372  if (input_string.size() == candidate_storage_string.size() &&
1373  !memcmp(input_string.data(),
1374  candidate_storage_string.data(),
1375  input_string.size())) {
1378  // found the string in storage
1379  break;
1380  }
1381  }
1382  }
1383  if (++bucket == string_id_string_dict_hash_table.size()) {
1384  bucket = 0;
1385  }
1386  }
1387  return bucket;
1388 }
1389 
1391  const string_dict_hash_t hash,
1392  const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1393  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1394  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1395  while (true) {
1396  if (string_id_string_dict_hash_table[bucket] ==
1397  INVALID_STR_ID) { // In this case it means the slot is available for use
1398  break;
1399  }
1400  collisions_++;
1401  // wrap around
1402  if (++bucket == string_dict_hash_table_size) {
1403  bucket = 0;
1404  }
1405  }
1406  return bucket;
1407 }
1408 
1410  const size_t write_length) {
1411  if (payload_file_off_ + write_length > payload_file_size_) {
1412  const size_t min_capacity_needed =
1413  write_length - (payload_file_size_ - payload_file_off_);
1414  if (!isTemp_) {
1415  CHECK_GE(payload_fd_, 0);
1417  addPayloadCapacity(min_capacity_needed);
1418  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1419  payload_map_ =
1420  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1421  } else {
1422  addPayloadCapacity(min_capacity_needed);
1423  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1424  }
1425  }
1426 }
1427 
1429  const size_t write_length) {
1430  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1431  if (offset_file_off + write_length >= offset_file_size_) {
1432  const size_t min_capacity_needed =
1433  write_length - (offset_file_size_ - offset_file_off);
1434  if (!isTemp_) {
1435  CHECK_GE(offset_fd_, 0);
1437  addOffsetCapacity(min_capacity_needed);
1438  CHECK(offset_file_off + write_length <= offset_file_size_);
1439  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1441  } else {
1442  addOffsetCapacity(min_capacity_needed);
1443  CHECK(offset_file_off + write_length <= offset_file_size_);
1444  }
1445  }
1446 }
1447 
1448 template <class String>
1449 void StringDictionary::appendToStorage(const String str) noexcept {
1450  // write the payload
1451  checkAndConditionallyIncreasePayloadCapacity(str.size());
1452  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1453 
1454  // write the offset and length
1455  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1456  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1457 
1458  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1459  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1460 }
1461 
1462 template <class String>
1464  const std::vector<String>& input_strings,
1465  const std::vector<size_t>& string_memory_ids,
1466  const size_t sum_new_strings_lengths) noexcept {
1467  const size_t num_strings = string_memory_ids.size();
1468 
1469  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1470  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1471 
1472  for (size_t i = 0; i < num_strings; ++i) {
1473  const size_t string_idx = string_memory_ids[i];
1474  const String str = input_strings[string_idx];
1475  const size_t str_size(str.size());
1476  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1477  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1478  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1479  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1480  }
1481 }
1482 
1483 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1484  noexcept {
1485  const StringIdxEntry* str_meta = offset_map_ + string_id;
1486  return {payload_map_ + str_meta->off, str_meta->size};
1487 }
1488 
1490  const int string_id) const noexcept {
1491  if (!isTemp_) {
1492  CHECK_GE(payload_fd_, 0);
1493  CHECK_GE(offset_fd_, 0);
1494  }
1495  CHECK_GE(string_id, 0);
1496  const StringIdxEntry* str_meta = offset_map_ + string_id;
1497  if (str_meta->size == 0xffff) {
1498  // hit the canary
1499  return {nullptr, 0, true};
1500  }
1501  return {payload_map_ + str_meta->off, str_meta->size, false};
1502 }
1503 
1504 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1505  if (!isTemp_) {
1506  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1507  } else {
1508  payload_map_ = static_cast<char*>(
1509  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1510  }
1511 }
1512 
1513 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1514  if (!isTemp_) {
1515  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1516  } else {
1517  offset_map_ = static_cast<StringIdxEntry*>(
1518  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1519  }
1520 }
1521 
1523  int fd,
1524  const size_t min_capacity_requested) noexcept {
1525  const size_t canary_buff_size_to_add =
1526  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1527  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1528 
1529  if (canary_buffer_size < canary_buff_size_to_add) {
1530  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1531  canary_buffer_size = canary_buff_size_to_add;
1532  CHECK(CANARY_BUFFER);
1533  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1534  }
1535 
1536  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1537  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1538  CHECK(write_return > 0 &&
1539  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1540  return canary_buff_size_to_add;
1541 }
1542 
1544  size_t& mem_size,
1545  const size_t min_capacity_requested) noexcept {
1546  const size_t canary_buff_size_to_add =
1547  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1548  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1549  if (canary_buffer_size < canary_buff_size_to_add) {
1550  CANARY_BUFFER =
1551  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1552  canary_buffer_size = canary_buff_size_to_add;
1553  CHECK(CANARY_BUFFER);
1554  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1555  }
1556  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1557  CHECK(new_addr);
1558  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1559  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1560  mem_size += canary_buff_size_to_add;
1561  return new_addr;
1562 }
1563 
1565  if (!like_cache_.empty()) {
1566  decltype(like_cache_)().swap(like_cache_);
1567  }
1568  if (!regex_cache_.empty()) {
1569  decltype(regex_cache_)().swap(regex_cache_);
1570  }
1571  if (!equal_cache_.empty()) {
1572  decltype(equal_cache_)().swap(equal_cache_);
1573  }
1574  compare_cache_.invalidateInvertedIndex();
1575 }
1576 
1577 // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1578 // load. The next write to the dictionary that does checkpoint will make the
1579 // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1580 // assuming not replicated dictionary
1582  if (isClient()) {
1583  try {
1584  return client_->checkpoint();
1585  } catch (...) {
1586  return false;
1587  }
1588  }
1589  CHECK(!isTemp_);
1590  bool ret = true;
1591  ret = ret &&
1592  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1593  ret = ret &&
1594  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1595  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1596  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1597  return ret;
1598 }
1599 
1600 bool StringDictionary::isClient() const noexcept {
1601  return static_cast<bool>(client_);
1602 }
1603 
1605  // This method is not thread-safe.
1606  const auto cur_cache_size = sorted_cache.size();
1607  std::vector<int32_t> temp_sorted_cache;
1608  for (size_t i = cur_cache_size; i < str_count_; i++) {
1609  temp_sorted_cache.push_back(i);
1610  }
1611  sortCache(temp_sorted_cache);
1612  mergeSortedCache(temp_sorted_cache);
1613 }
1614 
1615 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1616  // This method is not thread-safe.
1617 
1618  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1619  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1620 
1621  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1622  auto a_str = this->getStringFromStorage(a);
1623  auto b_str = this->getStringFromStorage(b);
1624  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1625  });
1626 }
1627 
1628 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1629  // this method is not thread safe
1630  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1631  size_t t_idx = 0, s_idx = 0, idx = 0;
1632  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1633  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1634  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1635  const auto insert_from_temp_cache =
1636  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1637  if (insert_from_temp_cache) {
1638  updated_cache[idx] = temp_sorted_cache[t_idx++];
1639  } else {
1640  updated_cache[idx] = sorted_cache[s_idx++];
1641  }
1642  }
1643  while (t_idx < temp_sorted_cache.size()) {
1644  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1645  }
1646  while (s_idx < sorted_cache.size()) {
1647  updated_cache[idx++] = sorted_cache[s_idx++];
1648  }
1649  sorted_cache.swap(updated_cache);
1650 }
1651 
1653  std::vector<int32_t>& dest_ids,
1654  StringDictionary* dest_dict,
1655  const std::vector<int32_t>& source_ids,
1656  const StringDictionary* source_dict,
1657  const std::vector<std::string const*>& transient_string_vec) {
1658  std::vector<std::string> strings;
1659 
1660  for (const int32_t source_id : source_ids) {
1661  if (source_id == std::numeric_limits<int32_t>::min()) {
1662  strings.emplace_back("");
1663  } else if (source_id < 0) {
1664  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1665  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1666  strings.emplace_back(*transient_string_vec[string_index]);
1667  } else {
1668  strings.push_back(source_dict->getString(source_id));
1669  }
1670  }
1671 
1672  dest_ids.resize(strings.size());
1673  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1674 }
1675 
1677  std::vector<std::vector<int32_t>>& dest_array_ids,
1678  StringDictionary* dest_dict,
1679  const std::vector<std::vector<int32_t>>& source_array_ids,
1680  const StringDictionary* source_dict) {
1681  dest_array_ids.resize(source_array_ids.size());
1682 
1683  std::atomic<size_t> row_idx{0};
1684  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1685  int thread_id) {
1686  for (;;) {
1687  auto row = row_idx.fetch_add(1);
1688 
1689  if (row >= dest_array_ids.size()) {
1690  return;
1691  }
1692  const auto& source_ids = source_array_ids[row];
1693  auto& dest_ids = dest_array_ids[row];
1694  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1695  }
1696  };
1697 
1698  const int num_worker_threads = std::thread::hardware_concurrency();
1699 
1700  if (source_array_ids.size() / num_worker_threads > 10) {
1701  std::vector<std::future<void>> worker_threads;
1702  for (int i = 0; i < num_worker_threads; ++i) {
1703  worker_threads.push_back(std::async(std::launch::async, processor, i));
1704  }
1705 
1706  for (auto& child : worker_threads) {
1707  child.wait();
1708  }
1709  for (auto& child : worker_threads) {
1710  child.get();
1711  }
1712  } else {
1713  processor(0);
1714  }
1715 }
1716 
1717 std::vector<std::string_view> StringDictionary::getStringViews(
1718  const size_t generation) const {
1719  auto timer = DEBUG_TIMER(__func__);
1720  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1721  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1722  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1723  // The CHECK_LE below is currently redundant with the check
1724  // above against MAX_STRCOUNT, however given we iterate using
1725  // int32_t types for efficiency (to match type expected by
1726  // getStringFromStorageFast, check that the # of strings is also
1727  // in the int32_t range in case MAX_STRCOUNT is changed
1728 
1729  // Todo(todd): consider aliasing the max logical type width
1730  // (currently int32_t) throughout StringDictionary
1731  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1732 
1733  std::vector<std::string_view> string_views(num_strings);
1734  // We can bail early if the generation-specified dictionary is empty
1735  if (num_strings == 0) {
1736  return string_views;
1737  }
1738  constexpr int64_t tbb_parallel_threshold{1000};
1739  if (num_strings < tbb_parallel_threshold) {
1740  // Use int32_t to match type expected by getStringFromStorageFast
1741  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1742  string_views[string_idx] = getStringFromStorageFast(string_idx);
1743  }
1744  } else {
1745  constexpr int64_t target_strings_per_thread{1000};
1746  const ThreadInfo thread_info(
1747  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1748  CHECK_GE(thread_info.num_threads, 1L);
1749  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1750 
1751  tbb::task_arena limited_arena(thread_info.num_threads);
1752  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1753  limited_arena.execute([&] {
1755  tbb::blocked_range<int64_t>(
1756  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1757  [&](const tbb::blocked_range<int64_t>& r) {
1758  // r should be in range of int32_t per CHECK above
1759  const int32_t start_idx = r.begin();
1760  const int32_t end_idx = r.end();
1761  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1762  string_views[string_idx] = getStringFromStorageFast(string_idx);
1763  }
1764  },
1765  tbb::simple_partitioner());
1766  });
1767  }
1768  return string_views;
1769 }
1770 
1771 std::vector<std::string_view> StringDictionary::getStringViews() const {
1773 }
1774 
1776  const std::shared_ptr<StringDictionary> dest_dict,
1777  StringLookupCallback const& dest_transient_lookup_callback) const {
1778  auto timer = DEBUG_TIMER(__func__);
1779  const size_t num_source_strings = storageEntryCount();
1780  const size_t num_dest_strings = dest_dict->storageEntryCount();
1781  std::vector<int32_t> translated_ids(num_source_strings);
1782 
1783  buildDictionaryTranslationMap(dest_dict.get(),
1784  translated_ids.data(),
1785  num_source_strings,
1786  num_dest_strings,
1787  true, // Just assume true for dest_has_transients as this
1788  // function is only used for testing currently
1789  dest_transient_lookup_callback,
1790  {});
1791  return translated_ids;
1792 }
1793 
1794 void order_translation_locks(const int32_t source_db_id,
1795  const int32_t source_dict_id,
1796  const int32_t dest_db_id,
1797  const int32_t dest_dict_id,
1798  std::shared_lock<std::shared_mutex>& source_read_lock,
1799  std::shared_lock<std::shared_mutex>& dest_read_lock) {
1800  const bool dicts_are_same =
1801  source_db_id == dest_db_id && source_dict_id == dest_dict_id;
1802  const bool source_dict_is_locked_first =
1803  source_db_id < dest_db_id ||
1804  (source_db_id == dest_db_id && source_dict_id < dest_dict_id);
1805  if (dicts_are_same) {
1806  // dictionaries are same, only take one write lock
1807  dest_read_lock.lock();
1808  } else if (source_dict_is_locked_first) {
1809  source_read_lock.lock();
1810  dest_read_lock.lock();
1811  } else {
1812  dest_read_lock.lock();
1813  source_read_lock.lock();
1814  }
1815 }
1816 
1818  const StringDictionary* dest_dict,
1819  int32_t* translated_ids,
1820  const int64_t source_generation,
1821  const int64_t dest_generation,
1822  const bool dest_has_transients,
1823  StringLookupCallback const& dest_transient_lookup_callback,
1824  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
1825  auto timer = DEBUG_TIMER(__func__);
1826  CHECK_GE(source_generation, 0L);
1827  CHECK_GE(dest_generation, 0L);
1828  const int64_t num_source_strings = source_generation;
1829  const int64_t num_dest_strings = dest_generation;
1830 
1831  // We can bail early if there are no source strings to translate
1832  if (num_source_strings == 0L) {
1833  return 0;
1834  }
1835 
1836  // If here we should should have local dictionaries.
1837  // Note case of transient source dictionaries that aren't
1838  // seen as remote (they have no client_no_timeout_) is covered
1839  // by early bail above on num_source_strings == 0
1840  if (dest_dict->client_no_timeout_) {
1841  throw std::runtime_error(
1842  "Cannot translate between a local source and remote destination dictionary.");
1843  }
1844 
1845  // Sort this/source dict and dest dict on folder_ so we can enforce
1846  // lock ordering and avoid deadlocks
1847  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1848  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1849  std::defer_lock);
1851  getDictId(),
1852  dest_dict->getDbId(),
1853  dest_dict->getDictId(),
1854  source_read_lock,
1855  dest_read_lock);
1856 
1857  // For both source and destination dictionaries we cap the max
1858  // entries to be translated/translated to at the supplied
1859  // generation arguments, if valid (i.e. >= 0), otherwise just the
1860  // size of each dictionary
1861 
1862  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1863  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1864  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1865 
1866  constexpr int64_t target_strings_per_thread{1000};
1867  const ThreadInfo thread_info(
1868  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1869  CHECK_GE(thread_info.num_threads, 1L);
1870  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1871 
1872  // We use a tbb::task_arena to cap the number of threads, has been
1873  // in other contexts been shown to exhibit better performance when low
1874  // numbers of threads are needed than just letting tbb figure the number of threads,
1875  // but should benchmark in this specific context
1876 
1877  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1878  const bool has_string_ops = string_ops.size();
1879 
1880  tbb::task_arena limited_arena(thread_info.num_threads);
1881  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1882  constexpr bool short_circuit_empty_dictionary_translations{false};
1883  limited_arena.execute([&] {
1884  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1885  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1887  tbb::blocked_range<int32_t>(
1888  0,
1889  num_source_strings,
1890  thread_info.num_elems_per_thread /* tbb grain_size */),
1891  [&](const tbb::blocked_range<int32_t>& r) {
1892  const int32_t start_idx = r.begin();
1893  const int32_t end_idx = r.end();
1894  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1895  translated_ids[string_idx] = INVALID_STR_ID;
1896  }
1897  },
1898  tbb::simple_partitioner());
1899  num_strings_not_translated_per_thread[0] += num_source_strings;
1900  } else {
1901  // The below logic, by executing low-level private variable accesses on both
1902  // dictionaries, is less clean than a previous variant that simply called
1903  // `getStringViews` from the source dictionary and then called `getBulk` on the
1904  // destination dictionary, but this version gets significantly better performance
1905  // (~2X), likely due to eliminating the overhead of writing out the string views and
1906  // then reading them back in (along with the associated cache misses)
1908  tbb::blocked_range<int32_t>(
1909  0,
1910  num_source_strings,
1911  thread_info.num_elems_per_thread /* tbb grain_size */),
1912  [&](const tbb::blocked_range<int32_t>& r) {
1913  const int32_t start_idx = r.begin();
1914  const int32_t end_idx = r.end();
1915  size_t num_strings_not_translated = 0;
1916  std::string string_ops_storage; // Needs to be thread local to back
1917  // string_view returned by string_ops()
1918  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1919  ++source_string_id) {
1920  const std::string_view source_str =
1921  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1922  string_ops_storage)
1923  : getStringFromStorageFast(source_string_id);
1924 
1925  if (source_str.empty()) {
1926  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1927  continue;
1928  }
1929  // Get the hash from this/the source dictionary's cache, as the function
1930  // will be the same for the dest_dict, sparing us having to recompute it
1931 
1932  // Todo(todd): Remove option to turn string hash cache off or at least
1933  // make a constexpr to avoid these branches when we expect it to be always
1934  // on going forward
1935  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1936  ? hash_cache_[source_string_id]
1937  : hash_string(source_str);
1938  const uint32_t hash_bucket = dest_dict->computeBucket(
1939  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1940  const auto translated_string_id =
1941  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
1942  translated_ids[source_string_id] = translated_string_id;
1943 
1944  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
1945  translated_string_id >= num_dest_strings) {
1946  if (dest_has_transients) {
1947  num_strings_not_translated +=
1948  dest_transient_lookup_callback(source_str, source_string_id);
1949  } else {
1950  num_strings_not_translated++;
1951  }
1952  continue;
1953  }
1954  }
1955  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
1956  num_strings_not_translated_per_thread[tbb_thread_idx] +=
1957  num_strings_not_translated;
1958  },
1959  tbb::simple_partitioner());
1960  }
1961  });
1962  size_t total_num_strings_not_translated = 0;
1963  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
1964  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
1965  }
1966  return total_num_strings_not_translated;
1967 }
1968 
1970  Datum* translated_ids,
1971  const int64_t source_generation,
1972  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
1973  auto timer = DEBUG_TIMER(__func__);
1974  CHECK_GE(source_generation, 0L);
1975  CHECK_GT(string_op_infos.size(), 0UL);
1976  CHECK(!string_op_infos.back().getReturnType().is_string());
1977  const int64_t num_source_strings = source_generation;
1978 
1979  // We can bail early if there are no source strings to translate
1980  if (num_source_strings == 0L) {
1981  return;
1982  }
1983 
1984  // If here we should should have a local dictionary
1985  // Note case of transient source dictionaries that aren't
1986  // seen as remote (they have no client_no_timeout_) is covered
1987  // by early bail above on num_source_strings == 0
1988 
1989  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_);
1990 
1991  // For source dictionary we cap the number of entries
1992  // to be translated/translated to at the supplied
1993  // generation arguments, if valid (i.e. >= 0), otherwise
1994  // just the size of each dictionary
1995 
1996  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1997 
1998  constexpr int64_t target_strings_per_thread{1000};
1999  const ThreadInfo thread_info(
2000  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
2001  CHECK_GE(thread_info.num_threads, 1L);
2002  CHECK_GE(thread_info.num_elems_per_thread, 1L);
2003 
2004  // We use a tbb::task_arena to cap the number of threads, has been
2005  // in other contexts been shown to exhibit better performance when low
2006  // numbers of threads are needed than just letting tbb figure the number of threads,
2007  // but should benchmark in this specific context
2008 
2009  const StringOps_Namespace::StringOps string_ops(string_op_infos);
2010  CHECK_GT(string_ops.size(), 0UL);
2011 
2012  tbb::task_arena limited_arena(thread_info.num_threads);
2013  // The below logic, by executing low-level private variable accesses on both
2014  // dictionaries, is less clean than a previous variant that simply called
2015  // `getStringViews` from the source dictionary and then called `getBulk` on the
2016  // destination dictionary, but this version gets significantly better performance
2017  // (~2X), likely due to eliminating the overhead of writing out the string views and
2018  // then reading them back in (along with the associated cache misses)
2020  tbb::blocked_range<int32_t>(
2021  0, num_source_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
2022  [&](const tbb::blocked_range<int32_t>& r) {
2023  const int32_t start_idx = r.begin();
2024  const int32_t end_idx = r.end();
2025  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2026  ++source_string_id) {
2027  const std::string source_str =
2028  std::string(getStringFromStorageFast(source_string_id));
2029  translated_ids[source_string_id] = string_ops.numericEval(source_str);
2030  }
2031  });
2032 }
2033 
2034 void translate_string_ids(std::vector<int32_t>& dest_ids,
2035  const LeafHostInfo& dict_server_host,
2036  const DictRef dest_dict_ref,
2037  const std::vector<int32_t>& source_ids,
2038  const DictRef source_dict_ref,
2039  const int32_t dest_generation) {
2040  DictRef temp_dict_ref(-1, -1);
2041  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
2042  string_client.translate_string_ids(
2043  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
2044 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
bool isClient() const noexcept
void increaseHashTableCapacity() noexcept
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void operator()(std::string_view const, int32_t const string_id) override
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:330
std::vector< std::string > copyStrings() const
void buildDictionaryNumericTranslationMap(Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:216
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
#define UNREACHABLE()
Definition: Logger.h:266
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:235
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
Constants for Builtin SQL Types supported by HEAVY.AI.
std::string offsets_path_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
const DictRef dict_ref_
std::unordered_map< std::string, int32_t > map_
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:234
int32_t getOrAdd(const std::string &str) noexcept
void throw_string_too_long_error(std::string_view str, const DictRef &dict_ref)
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:302
constexpr double a
Definition: Utm.h:32
int32_t getIdOfString(const String &) const
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
int64_t num_threads
Definition: ThreadInfo.h:22
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_NE(x, y)
Definition: Logger.h:231
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66
StringDictionary(const DictRef &dict_ref, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
void order_translation_locks(const int32_t source_db_id, const int32_t source_dict_id, const int32_t dest_db_id, const int32_t dest_dict_id, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:244
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:41
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
int32_t getUnlocked(const std::string_view sv) const noexcept
int32_t dictId
Definition: DictRef.h:11
void appendToStorage(const String str) noexcept
void throw_encoding_error(std::string_view str, const DictRef &dict_ref)
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::unordered_map< std::string, int32_t > moveMap()
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:232
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
#define CHECK_LE(x, y)
Definition: Logger.h:233
void eachStringSerially(int64_t const generation, StringCallback &) const
int32_t getDictId() const noexcept
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:57
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void update_leaf(const LeafHostInfo &host_info)
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
bool g_cache_string_hash
std::vector< std::string_view > getStringViews() const
int msync(void *addr, size_t length, bool async)
Definition: heavyai_fs.cpp:57
int32_t getDbId() const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
ThreadId thread_id()
Definition: Logger.cpp:820
int fsync(int fd)
Definition: heavyai_fs.cpp:62
std::function< int32_t(std::string const &)> makeLambdaStringToId() const
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
Definition: Execute.cpp:79
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
int32_t dbId
Definition: DictRef.h:10
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
const uint64_t round_up_p2(const uint64_t num)
std::string toString() const
Definition: DictRef.h:41
std::vector< int32_t > string_id_string_dict_hash_table_
ThreadInfo(const int64_t max_thread_count, const int64_t num_elems, const int64_t target_elems_per_thread)
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
std::function< bool(std::string_view, int32_t string_id)> StringLookupCallback
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
void close(const int fd)
Definition: heavyai_fs.cpp:70
constexpr double n
Definition: Utm.h:38
int cpu_threads()
Definition: thread_count.h:25
int get_page_size()
Definition: heavyai_fs.cpp:29
Definition: Datum.h:44
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:39
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
static unsigned transientIdToIndex(int32_t const id)
void operator()(std::string const &str, int32_t const string_id) override
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:316
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:255