OmniSciDB  29e35f4d58
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "StringDictionary.h"
18 #include "../Shared/sqltypes.h"
19 #include "../Utils/Regexp.h"
20 #include "../Utils/StringLike.h"
21 #include "LeafHostInfo.h"
22 #include "Shared/Logger.h"
23 #include "Shared/thread_count.h"
24 #include "StringDictionaryClient.h"
25 
26 #include <sys/fcntl.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 
30 #include <boost/filesystem/operations.hpp>
31 #include <boost/filesystem/path.hpp>
32 #include <boost/sort/spreadsort/string_sort.hpp>
33 
34 #include <future>
35 #include <thread>
36 
37 namespace {
38 const int SYSTEM_PAGE_SIZE = getpagesize();
39 
40 size_t file_size(const int fd) {
41  struct stat buf;
42  int err = fstat(fd, &buf);
43  CHECK_EQ(0, err);
44  return buf.st_size;
45 }
46 
47 int checked_open(const char* path, const bool recover) {
48  auto fd = open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
49  if (fd > 0) {
50  return fd;
51  }
52  auto err = std::string("Dictionary path ") + std::string(path) +
53  std::string(" does not exist.");
54  LOG(ERROR) << err;
55  throw DictPayloadUnavailable(err);
56 }
57 
58 void* checked_mmap(const int fd, const size_t sz) {
59  auto ptr = mmap(nullptr, sz, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
60  CHECK(ptr != reinterpret_cast<void*>(-1));
61 #ifdef __linux__
62 #ifdef MADV_HUGEPAGE
63  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED | MADV_HUGEPAGE);
64 #else
65  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED);
66 #endif
67 #endif
68  return ptr;
69 }
70 void checked_munmap(void* addr, size_t length) {
71  CHECK_EQ(0, munmap(addr, length));
72 }
73 
74 const uint64_t round_up_p2(const uint64_t num) {
75  uint64_t in = num;
76  in--;
77  in |= in >> 1;
78  in |= in >> 2;
79  in |= in >> 4;
80  in |= in >> 8;
81  in |= in >> 16;
82  in++;
83  // TODO MAT deal with case where filesize has been increased but reality is
84  // we are constrained to 2^31.
85  // In that situation this calculation will wrap to zero
86  if (in == 0 || (in > (UINT32_MAX))) {
87  in = UINT32_MAX;
88  }
89  return in;
90 }
91 
92 uint32_t rk_hash(const std::string& str) {
93  uint32_t str_hash = 1;
94  // rely on fact that unsigned overflow is defined and wraps
95  for (size_t i = 0; i < str.size(); ++i) {
96  str_hash = str_hash * 997 + str[i];
97  }
98  return str_hash;
99 }
100 } // namespace
101 
102 constexpr int32_t StringDictionary::INVALID_STR_ID;
103 constexpr size_t StringDictionary::MAX_STRLEN;
104 constexpr size_t StringDictionary::MAX_STRCOUNT;
105 
106 StringDictionary::StringDictionary(const std::string& folder,
107  const bool isTemp,
108  const bool recover,
109  const bool materializeHashes,
110  size_t initial_capacity)
111  : str_count_(0)
112  , str_ids_(initial_capacity, INVALID_STR_ID)
113  , rk_hashes_(initial_capacity)
114  , isTemp_(isTemp)
115  , materialize_hashes_(materializeHashes)
116  , payload_fd_(-1)
117  , offset_fd_(-1)
118  , offset_map_(nullptr)
119  , payload_map_(nullptr)
120  , offset_file_size_(0)
121  , payload_file_size_(0)
122  , payload_file_off_(0)
123  , strings_cache_(nullptr) {
124  if (!isTemp && folder.empty()) {
125  return;
126  }
127 
128  // initial capacity must be a power of two for efficient bucket computation
129  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
130  if (!isTemp_) {
131  boost::filesystem::path storage_path(folder);
132  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
133  const auto payload_path =
134  (storage_path / boost::filesystem::path("DictPayload")).string();
135  payload_fd_ = checked_open(payload_path.c_str(), recover);
136  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
138  offset_file_size_ = file_size(offset_fd_);
139  }
140 
141  if (payload_file_size_ == 0) {
143  }
144  if (offset_file_size_ == 0) {
146  }
147  if (!isTemp_) { // we never mmap or recover temp dictionaries
148  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
149  offset_map_ =
151  if (recover) {
152  const size_t bytes = file_size(offset_fd_);
153  if (bytes % sizeof(StringIdxEntry) != 0) {
154  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
155  }
156  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
157  // at this point we know the size of the StringDict we need to load
158  // so lets reallocate the vector to the correct size
159  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
160  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
161  str_ids_.swap(new_str_ids);
162  if (materialize_hashes_) {
163  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
164  rk_hashes_.swap(new_rk_hashes);
165  }
166  unsigned string_id = 0;
167  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
168 
169  uint32_t thread_inits = 0;
170  const auto thread_count = std::thread::hardware_concurrency();
171  const uint32_t items_per_thread = std::max<uint32_t>(
172  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
173  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
174  dictionary_futures;
175  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
176  dictionary_futures.emplace_back(std::async(
177  std::launch::async, [string_id, str_count, items_per_thread, this] {
178  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
179  for (uint32_t curr_id = string_id;
180  curr_id < string_id + items_per_thread && curr_id < str_count;
181  curr_id++) {
182  const auto recovered = getStringFromStorage(curr_id);
183  if (recovered.canary) {
184  // hit the canary, recovery finished
185  break;
186  } else {
187  std::string temp(recovered.c_str_ptr, recovered.size);
188  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
189  }
190  }
191  return hashVec;
192  }));
193  thread_inits++;
194  if (thread_inits % thread_count == 0) {
195  processDictionaryFutures(dictionary_futures);
196  }
197  }
198  // gather last few threads
199  if (dictionary_futures.size() != 0) {
200  processDictionaryFutures(dictionary_futures);
201  }
202  }
203  }
204 }
205 
207  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>&
208  dictionary_futures) {
209  for (auto& dictionary_future : dictionary_futures) {
210  dictionary_future.wait();
211  auto hashVec = dictionary_future.get();
212  for (auto& hash : hashVec) {
213  uint32_t bucket = computeUniqueBucketWithHash(hash.first, str_ids_);
214  payload_file_off_ += hash.second;
215  str_ids_[bucket] = static_cast<int32_t>(str_count_);
216  if (materialize_hashes_) {
217  rk_hashes_[str_count_] = hash.first;
218  }
219  ++str_count_;
220  }
221  }
222  dictionary_futures.clear();
223 }
224 
226  : strings_cache_(nullptr)
227  , client_(new StringDictionaryClient(host, dict_ref, true))
228  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
229 
231  if (client_) {
232  return;
233  }
234  if (payload_map_) {
235  if (!isTemp_) {
239  CHECK_GE(payload_fd_, 0);
241  CHECK_GE(offset_fd_, 0);
242  close(offset_fd_);
243  } else {
245  free(payload_map_);
246  free(offset_map_);
247  }
248  }
249 }
250 
251 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
252  if (client_) {
253  std::vector<int32_t> string_ids;
254  client_->get_or_add_bulk(string_ids, {str});
255  CHECK_EQ(size_t(1), string_ids.size());
256  return string_ids.front();
257  }
258  return getOrAddImpl(str);
259 }
260 
261 namespace {
262 
263 template <class T>
264 void log_encoding_error(const std::string& str) {
265  LOG(ERROR) << "Could not encode string: " << str
266  << ", the encoded value doesn't fit in " << sizeof(T) * 8
267  << " bits. Will store NULL instead.";
268 }
269 
270 } // namespace
271 
273  const std::vector<std::vector<std::string>>& string_array_vec,
274  std::vector<std::vector<int32_t>>& ids_array_vec) {
275  ids_array_vec.resize(string_array_vec.size());
276  for (size_t i = 0; i < string_array_vec.size(); i++) {
277  auto& strings = string_array_vec[i];
278  auto& ids = ids_array_vec[i];
279  ids.resize(strings.size());
280  getOrAddBulk(strings, &ids[0]);
281  }
282 }
283 
284 template <class T>
285 void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
286  T* encoded_vec) {
287  if (client_no_timeout_) {
288  getOrAddBulkRemote(string_vec, encoded_vec);
289  return;
290  }
291  size_t out_idx{0};
292  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
293 
294  for (const auto& str : string_vec) {
295  if (str.empty()) {
296  encoded_vec[out_idx++] = inline_int_null_value<T>();
297  continue;
298  }
299  CHECK(str.size() <= MAX_STRLEN);
300  uint32_t bucket;
301  const uint32_t hash = rk_hash(str);
302  bucket = computeBucket(hash, str, str_ids_, false);
303  if (str_ids_[bucket] != INVALID_STR_ID) {
304  encoded_vec[out_idx++] = str_ids_[bucket];
305  continue;
306  }
307  // need to add record to dictionary
308  // check there is room
309  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
310  log_encoding_error<T>(str);
311  encoded_vec[out_idx++] = inline_int_null_value<T>();
312  continue;
313  }
314  if (str_ids_[bucket] == INVALID_STR_ID) {
316  << "Maximum number (" << str_count_
317  << ") of Dictionary encoded Strings reached for this column, offset path "
318  "for column is "
319  << offsets_path_;
320  if (fillRateIsHigh()) {
321  // resize when more than 50% is full
323  bucket = computeBucket(hash, str, str_ids_, false);
324  }
325  appendToStorage(str);
326 
327  str_ids_[bucket] = static_cast<int32_t>(str_count_);
328  if (materialize_hashes_) {
329  rk_hashes_[str_count_] = hash;
330  }
331  ++str_count_;
332  }
333  encoded_vec[out_idx++] = str_ids_[bucket];
334  }
336 }
337 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
338  uint8_t* encoded_vec);
339 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
340  uint16_t* encoded_vec);
341 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
342  int32_t* encoded_vec);
343 
344 template <class T>
345 void StringDictionary::getOrAddBulkRemote(const std::vector<std::string>& string_vec,
346  T* encoded_vec) {
348  std::vector<int32_t> string_ids;
349  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
350  size_t out_idx{0};
351  for (size_t i = 0; i < string_ids.size(); ++i) {
352  const auto string_id = string_ids[i];
353  const bool invalid = string_id > max_valid_int_value<T>();
354  if (invalid || string_id == inline_int_null_value<int32_t>()) {
355  if (invalid) {
356  log_encoding_error<T>(string_vec[i]);
357  }
358  encoded_vec[out_idx++] = inline_int_null_value<T>();
359  continue;
360  }
361  encoded_vec[out_idx++] = string_id;
362  }
363 }
364 
366  const std::vector<std::string>& string_vec,
367  uint8_t* encoded_vec);
369  const std::vector<std::string>& string_vec,
370  uint16_t* encoded_vec);
372  const std::vector<std::string>& string_vec,
373  int32_t* encoded_vec);
374 
375 int32_t StringDictionary::getIdOfString(const std::string& str) const {
376  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
377  if (client_) {
378  return client_->get(str);
379  }
380  return getUnlocked(str);
381 }
382 
383 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
384  const uint32_t hash = rk_hash(str);
385  auto str_id = str_ids_[computeBucket(hash, str, str_ids_, false)];
386  return str_id;
387 }
388 
389 std::string StringDictionary::getString(int32_t string_id) const {
390  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
391  if (client_) {
392  std::string ret;
393  client_->get_string(ret, string_id);
394  return ret;
395  }
396  return getStringUnlocked(string_id);
397 }
398 
399 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
400  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
401  return getStringChecked(string_id);
402 }
403 
404 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
405  noexcept {
406  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
407  CHECK(!client_);
408  CHECK_LE(0, string_id);
409  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
410  return getStringBytesChecked(string_id);
411 }
412 
414  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
415  if (client_) {
416  return client_->storage_entry_count();
417  }
418  return str_count_;
419 }
420 
421 namespace {
422 
423 bool is_like(const std::string& str,
424  const std::string& pattern,
425  const bool icase,
426  const bool is_simple,
427  const char escape) {
428  return icase
429  ? (is_simple ? string_ilike_simple(
430  str.c_str(), str.size(), pattern.c_str(), pattern.size())
431  : string_ilike(str.c_str(),
432  str.size(),
433  pattern.c_str(),
434  pattern.size(),
435  escape))
436  : (is_simple ? string_like_simple(
437  str.c_str(), str.size(), pattern.c_str(), pattern.size())
438  : string_like(str.c_str(),
439  str.size(),
440  pattern.c_str(),
441  pattern.size(),
442  escape));
443 }
444 
445 } // namespace
446 
447 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
448  const bool icase,
449  const bool is_simple,
450  const char escape,
451  const size_t generation) const {
452  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
453  if (client_) {
454  return client_->get_like(pattern, icase, is_simple, escape, generation);
455  }
456  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
457  const auto it = like_cache_.find(cache_key);
458  if (it != like_cache_.end()) {
459  return it->second;
460  }
461  std::vector<int32_t> result;
462  std::vector<std::thread> workers;
463  int worker_count = cpu_threads();
464  CHECK_GT(worker_count, 0);
465  std::vector<std::vector<int32_t>> worker_results(worker_count);
466  CHECK_LE(generation, str_count_);
467  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
468  workers.emplace_back([&worker_results,
469  &pattern,
470  generation,
471  icase,
472  is_simple,
473  escape,
474  worker_idx,
475  worker_count,
476  this]() {
477  for (size_t string_id = worker_idx; string_id < generation;
478  string_id += worker_count) {
479  const auto str = getStringUnlocked(string_id);
480  if (is_like(str, pattern, icase, is_simple, escape)) {
481  worker_results[worker_idx].push_back(string_id);
482  }
483  }
484  });
485  }
486  for (auto& worker : workers) {
487  worker.join();
488  }
489  for (const auto& worker_result : worker_results) {
490  result.insert(result.end(), worker_result.begin(), worker_result.end());
491  }
492  // place result into cache for reuse if similar query
493  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
494 
495  CHECK(it_ok.second);
496 
497  return result;
498 }
499 
500 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
501  std::string comp_operator,
502  size_t generation) {
503  std::vector<int32_t> result;
504  auto eq_id_itr = equal_cache_.find(pattern);
505  int32_t eq_id = MAX_STRLEN + 1;
506  int32_t cur_size = str_count_;
507  if (eq_id_itr != equal_cache_.end()) {
508  auto eq_id = eq_id_itr->second;
509  if (comp_operator == "=") {
510  result.push_back(eq_id);
511  } else {
512  for (int32_t idx = 0; idx <= cur_size; idx++) {
513  if (idx == eq_id) {
514  continue;
515  }
516  result.push_back(idx);
517  }
518  }
519  } else {
520  std::vector<std::thread> workers;
521  int worker_count = cpu_threads();
522  CHECK_GT(worker_count, 0);
523  std::vector<std::vector<int32_t>> worker_results(worker_count);
524  CHECK_LE(generation, str_count_);
525  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
526  workers.emplace_back(
527  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
528  for (size_t string_id = worker_idx; string_id < generation;
529  string_id += worker_count) {
530  const auto str = getStringUnlocked(string_id);
531  if (str == pattern) {
532  worker_results[worker_idx].push_back(string_id);
533  }
534  }
535  });
536  }
537  for (auto& worker : workers) {
538  worker.join();
539  }
540  for (const auto& worker_result : worker_results) {
541  result.insert(result.end(), worker_result.begin(), worker_result.end());
542  }
543  if (result.size() > 0) {
544  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
545  CHECK(it_ok.second);
546  eq_id = result[0];
547  }
548  if (comp_operator == "<>") {
549  for (int32_t idx = 0; idx <= cur_size; idx++) {
550  if (idx == eq_id) {
551  continue;
552  }
553  result.push_back(idx);
554  }
555  }
556  }
557  return result;
558 }
559 
560 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
561  const std::string& comp_operator,
562  const size_t generation) {
563  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
564  if (client_) {
565  return client_->get_compare(pattern, comp_operator, generation);
566  }
567  std::vector<int32_t> ret;
568  if (str_count_ == 0) {
569  return ret;
570  }
571  if (sorted_cache.size() < str_count_) {
572  if (comp_operator == "=" || comp_operator == "<>") {
573  return getEquals(pattern, comp_operator, generation);
574  }
575 
577  }
578  auto cache_index = compare_cache_.get(pattern);
579 
580  if (!cache_index) {
581  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
582  const auto cache_itr = std::lower_bound(
583  sorted_cache.begin(),
584  sorted_cache.end(),
585  pattern,
586  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
587  auto a_str = this->getStringFromStorage(a);
588  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
589  });
590 
591  if (cache_itr == sorted_cache.end()) {
592  cache_index->index = sorted_cache.size() - 1;
593  cache_index->diff = 1;
594  } else {
595  const auto cache_str = getStringFromStorage(*cache_itr);
596  if (!string_eq(
597  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
598  cache_index->index = cache_itr - sorted_cache.begin() - 1;
599  cache_index->diff = 1;
600  } else {
601  cache_index->index = cache_itr - sorted_cache.begin();
602  cache_index->diff = 0;
603  }
604  }
605 
606  compare_cache_.put(pattern, cache_index);
607  }
608 
609  // since we have a cache in form of vector of ints which is sorted according to
610  // corresponding strings in the dictionary all we need is the index of the element
611  // which equal to the pattern that we are trying to match or the index of “biggest”
612  // element smaller than the pattern, to perform all the comparison operators over
613  // string. The search function guarantees we have such index so now it is just the
614  // matter to include all the elements in the result vector.
615 
616  // For < operator if the index that we have points to the element which is equal to
617  // the pattern that we are searching for we simply get all the elements less than the
618  // index. If the element pointed by the index is not equal to the pattern we are
619  // comparing with we also need to include that index in result vector, except when the
620  // index points to 0 and the pattern is lesser than the smallest value in the string
621  // dictionary.
622 
623  if (comp_operator == "<") {
624  size_t idx = cache_index->index;
625  if (cache_index->diff) {
626  idx = cache_index->index + 1;
627  if (cache_index->index == 0 && cache_index->diff > 0) {
628  idx = cache_index->index;
629  }
630  }
631  for (size_t i = 0; i < idx; i++) {
632  ret.push_back(sorted_cache[i]);
633  }
634 
635  // For <= operator if the index that we have points to the element which is equal to
636  // the pattern that we are searching for we want to include the element pointed by
637  // the index in the result set. If the element pointed by the index is not equal to
638  // the pattern we are comparing with we just want to include all the ids with index
639  // less than the index that is cached, except when pattern that we are searching for
640  // is smaller than the smallest string in the dictionary.
641 
642  } else if (comp_operator == "<=") {
643  size_t idx = cache_index->index + 1;
644  if (cache_index == 0 && cache_index->diff > 0) {
645  idx = cache_index->index;
646  }
647  for (size_t i = 0; i < idx; i++) {
648  ret.push_back(sorted_cache[i]);
649  }
650 
651  // For > operator we want to get all the elements with index greater than the index
652  // that we have except, when the pattern we are searching for is lesser than the
653  // smallest string in the dictionary we also want to include the id of the index
654  // that we have.
655 
656  } else if (comp_operator == ">") {
657  size_t idx = cache_index->index + 1;
658  if (cache_index->index == 0 && cache_index->diff > 0) {
659  idx = cache_index->index;
660  }
661  for (size_t i = idx; i < sorted_cache.size(); i++) {
662  ret.push_back(sorted_cache[i]);
663  }
664 
665  // For >= operator when the indexed element that we have points to element which is
666  // equal to the pattern we are searching for we want to include that in the result
667  // vector. If the index that we have does not point to the string which is equal to
668  // the pattern we are searching we don’t want to include that id into the result
669  // vector except when the index is 0.
670 
671  } else if (comp_operator == ">=") {
672  size_t idx = cache_index->index;
673  if (cache_index->diff) {
674  idx = cache_index->index + 1;
675  if (cache_index->index == 0 && cache_index->diff > 0) {
676  idx = cache_index->index;
677  }
678  }
679  for (size_t i = idx; i < sorted_cache.size(); i++) {
680  ret.push_back(sorted_cache[i]);
681  }
682  } else if (comp_operator == "=") {
683  if (!cache_index->diff) {
684  ret.push_back(sorted_cache[cache_index->index]);
685  }
686 
687  // For <> operator it is simple matter of not including id of string which is equal
688  // to pattern we are searching for.
689  } else if (comp_operator == "<>") {
690  if (!cache_index->diff) {
691  size_t idx = cache_index->index;
692  for (size_t i = 0; i < idx; i++) {
693  ret.push_back(sorted_cache[i]);
694  }
695  ++idx;
696  for (size_t i = idx; i < sorted_cache.size(); i++) {
697  ret.push_back(sorted_cache[i]);
698  }
699  } else {
700  for (size_t i = 0; i < sorted_cache.size(); i++) {
701  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
702  }
703  }
704 
705  } else {
706  std::runtime_error("Unsupported string comparison operator");
707  }
708  return ret;
709 }
710 
711 namespace {
712 
713 bool is_regexp_like(const std::string& str,
714  const std::string& pattern,
715  const char escape) {
716  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
717 }
718 
719 } // namespace
720 
721 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
722  const char escape,
723  const size_t generation) const {
724  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
725  if (client_) {
726  return client_->get_regexp_like(pattern, escape, generation);
727  }
728  const auto cache_key = std::make_pair(pattern, escape);
729  const auto it = regex_cache_.find(cache_key);
730  if (it != regex_cache_.end()) {
731  return it->second;
732  }
733  std::vector<int32_t> result;
734  std::vector<std::thread> workers;
735  int worker_count = cpu_threads();
736  CHECK_GT(worker_count, 0);
737  std::vector<std::vector<int32_t>> worker_results(worker_count);
738  CHECK_LE(generation, str_count_);
739  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
740  workers.emplace_back([&worker_results,
741  &pattern,
742  generation,
743  escape,
744  worker_idx,
745  worker_count,
746  this]() {
747  for (size_t string_id = worker_idx; string_id < generation;
748  string_id += worker_count) {
749  const auto str = getStringUnlocked(string_id);
750  if (is_regexp_like(str, pattern, escape)) {
751  worker_results[worker_idx].push_back(string_id);
752  }
753  }
754  });
755  }
756  for (auto& worker : workers) {
757  worker.join();
758  }
759  for (const auto& worker_result : worker_results) {
760  result.insert(result.end(), worker_result.begin(), worker_result.end());
761  }
762  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
763  CHECK(it_ok.second);
764 
765  return result;
766 }
767 
768 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
769  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
770  if (client_) {
771  // TODO(miyu): support remote string dictionary
772  throw std::runtime_error(
773  "copying dictionaries from remote server is not supported yet.");
774  }
775 
776  if (strings_cache_) {
777  return strings_cache_;
778  }
779 
780  strings_cache_ = std::make_shared<std::vector<std::string>>();
781  strings_cache_->reserve(str_count_);
782  const bool multithreaded = str_count_ > 10000;
783  const auto worker_count =
784  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
785  CHECK_GT(worker_count, 0UL);
786  std::vector<std::vector<std::string>> worker_results(worker_count);
787  auto copy = [this](std::vector<std::string>& str_list,
788  const size_t start_id,
789  const size_t end_id) {
790  CHECK_LE(start_id, end_id);
791  str_list.reserve(end_id - start_id);
792  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
793  str_list.push_back(getStringUnlocked(string_id));
794  }
795  };
796  if (multithreaded) {
797  std::vector<std::future<void>> workers;
798  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
799  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
800  worker_idx < worker_count && start < str_count_;
801  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
802  workers.push_back(std::async(
803  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
804  }
805  for (auto& worker : workers) {
806  worker.get();
807  }
808  } else {
809  CHECK_EQ(worker_results.size(), size_t(1));
810  copy(worker_results[0], 0, str_count_);
811  }
812 
813  for (const auto& worker_result : worker_results) {
814  strings_cache_->insert(
815  strings_cache_->end(), worker_result.begin(), worker_result.end());
816  }
817  return strings_cache_;
818 }
819 
820 bool StringDictionary::fillRateIsHigh() const noexcept {
821  return str_ids_.size() < str_count_ * 2;
822 }
823 
825  std::vector<int32_t> new_str_ids(str_ids_.size() * 2, INVALID_STR_ID);
826 
827  if (materialize_hashes_) {
828  for (size_t i = 0; i < str_ids_.size(); ++i) {
829  if (str_ids_[i] != INVALID_STR_ID) {
830  const uint32_t hash = rk_hashes_[str_ids_[i]];
831  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
832  new_str_ids[bucket] = str_ids_[i];
833  }
834  }
835  rk_hashes_.resize(rk_hashes_.size() * 2);
836  } else {
837  for (size_t i = 0; i < str_count_; ++i) {
838  const auto str = getStringChecked(i);
839  const uint32_t hash = rk_hash(str);
840  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
841  new_str_ids[bucket] = i;
842  }
843  }
844  str_ids_.swap(new_str_ids);
845 }
846 
847 int32_t StringDictionary::getOrAddImpl(const std::string& str) noexcept {
848  // @TODO(wei) treat empty string as NULL for now
849  if (str.size() == 0) {
850  return inline_int_null_value<int32_t>();
851  }
852  CHECK(str.size() <= MAX_STRLEN);
853  uint32_t bucket;
854  const uint32_t hash = rk_hash(str);
855  {
856  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
857  bucket = computeBucket(hash, str, str_ids_, false);
858  if (str_ids_[bucket] != INVALID_STR_ID) {
859  return str_ids_[bucket];
860  }
861  }
862  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
863  // need to recalculate the bucket in case it changed before
864  // we got the lock
865  bucket = computeBucket(hash, str, str_ids_, false);
866  if (str_ids_[bucket] == INVALID_STR_ID) {
868  << "Maximum number (" << str_count_
869  << ") of Dictionary encoded Strings reached for this column, offset path "
870  "for column is "
871  << offsets_path_;
872  if (fillRateIsHigh()) {
873  // resize when more than 50% is full
875  bucket = computeBucket(hash, str, str_ids_, false);
876  }
877  appendToStorage(str);
878  str_ids_[bucket] = static_cast<int32_t>(str_count_);
879  if (materialize_hashes_) {
880  rk_hashes_[str_count_] = hash;
881  }
882  ++str_count_;
884  }
885  return str_ids_[bucket];
886 }
887 
888 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
889  const auto str_canary = getStringFromStorage(string_id);
890  CHECK(!str_canary.canary);
891  return std::string(str_canary.c_str_ptr, str_canary.size);
892 }
893 
895  const int string_id) const noexcept {
896  const auto str_canary = getStringFromStorage(string_id);
897  CHECK(!str_canary.canary);
898  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
899 }
900 
901 uint32_t StringDictionary::computeBucket(const uint32_t hash,
902  const std::string str,
903  const std::vector<int32_t>& data,
904  const bool unique) const noexcept {
905  auto bucket = hash & (data.size() - 1);
906  while (true) {
907  if (data[bucket] ==
908  INVALID_STR_ID) { // In this case it means the slot is available for use
909  break;
910  }
911  // if records are unique I don't need to do this test as I know it will not be the
912  // same
913  if (!unique) {
914  if (materialize_hashes_) {
915  if (hash == rk_hashes_[data[bucket]]) {
916  // can't be the same string if hash is different
917  const auto old_str = getStringFromStorage(data[bucket]);
918  if (str.size() == old_str.size &&
919  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
920  // found the string
921  break;
922  }
923  }
924  } else {
925  const auto old_str = getStringFromStorage(data[bucket]);
926  if (str.size() == old_str.size &&
927  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
928  // found the string
929  break;
930  }
931  }
932  }
933  // wrap around
934  if (++bucket == data.size()) {
935  bucket = 0;
936  }
937  }
938  return bucket;
939 }
940 
942  const uint32_t hash,
943  const std::vector<int32_t>& data) const noexcept {
944  auto bucket = hash & (data.size() - 1);
945  while (true) {
946  if (data[bucket] ==
947  INVALID_STR_ID) { // In this case it means the slot is available for use
948  break;
949  }
950  // wrap around
951  if (++bucket == data.size()) {
952  bucket = 0;
953  }
954  }
955  return bucket;
956 }
957 
958 void StringDictionary::appendToStorage(const std::string& str) noexcept {
959  if (!isTemp_) {
960  CHECK_GE(payload_fd_, 0);
961  CHECK_GE(offset_fd_, 0);
962  }
963  // write the payload
964  if (payload_file_off_ + str.size() > payload_file_size_) {
965  if (!isTemp_) {
968  CHECK(payload_file_off_ + str.size() <= payload_file_size_);
969  payload_map_ =
970  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
971  } else {
973  }
974  }
975  memcpy(payload_map_ + payload_file_off_, str.c_str(), str.size());
976  // write the offset and length
977  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
978  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
979  payload_file_off_ += str.size();
980  if (offset_file_off + sizeof(str_meta) >= offset_file_size_) {
981  if (!isTemp_) {
984  CHECK(offset_file_off + sizeof(str_meta) <= offset_file_size_);
985  offset_map_ =
987  } else {
989  }
990  }
991  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
992 }
993 
995  const int string_id) const noexcept {
996  if (!isTemp_) {
997  CHECK_GE(payload_fd_, 0);
998  CHECK_GE(offset_fd_, 0);
999  }
1000  CHECK_GE(string_id, 0);
1001  const StringIdxEntry* str_meta = offset_map_ + string_id;
1002  if (str_meta->size == 0xffff) {
1003  // hit the canary
1004  return {nullptr, 0, true};
1005  }
1006  return {payload_map_ + str_meta->off, str_meta->size, false};
1007 }
1008 
1010  if (!isTemp_) {
1012  } else {
1013  payload_map_ =
1014  static_cast<char*>(addMemoryCapacity(payload_map_, payload_file_size_));
1015  }
1016 }
1017 
1019  if (!isTemp_) {
1021  } else {
1022  offset_map_ =
1024  }
1025 }
1026 
1027 size_t StringDictionary::addStorageCapacity(int fd) noexcept {
1028  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1029  if (!CANARY_BUFFER) {
1030  CANARY_BUFFER = static_cast<char*>(malloc(CANARY_BUFF_SIZE));
1032  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1033  }
1034  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1035  CHECK(write(fd, CANARY_BUFFER, CANARY_BUFF_SIZE) == CANARY_BUFF_SIZE);
1036  return CANARY_BUFF_SIZE;
1037 }
1038 
1039 void* StringDictionary::addMemoryCapacity(void* addr, size_t& mem_size) noexcept {
1040  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1041  if (!CANARY_BUFFER) {
1042  CANARY_BUFFER = reinterpret_cast<char*>(malloc(CANARY_BUFF_SIZE));
1044  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1045  }
1046  void* new_addr = realloc(addr, mem_size + CANARY_BUFF_SIZE);
1047  CHECK(new_addr);
1048  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1049  CHECK(memcpy(write_addr, CANARY_BUFFER, CANARY_BUFF_SIZE));
1050  mem_size += CANARY_BUFF_SIZE;
1051  return new_addr;
1052 }
1053 
1055  if (!like_cache_.empty()) {
1056  decltype(like_cache_)().swap(like_cache_);
1057  }
1058  if (!regex_cache_.empty()) {
1059  decltype(regex_cache_)().swap(regex_cache_);
1060  }
1061  if (!equal_cache_.empty()) {
1062  decltype(equal_cache_)().swap(equal_cache_);
1063  }
1064  compare_cache_.invalidateInvertedIndex();
1065 }
1066 
1067 char* StringDictionary::CANARY_BUFFER{nullptr};
1068 
1070  if (client_) {
1071  try {
1072  return client_->checkpoint();
1073  } catch (...) {
1074  return false;
1075  }
1076  }
1077  CHECK(!isTemp_);
1078  bool ret = true;
1079  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1080  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1081  ret = ret && (fsync(offset_fd_) == 0);
1082  ret = ret && (fsync(payload_fd_) == 0);
1083  return ret;
1084 }
1085 
1087  // This method is not thread-safe.
1088  const auto cur_cache_size = sorted_cache.size();
1089  std::vector<int32_t> temp_sorted_cache;
1090  for (size_t i = cur_cache_size; i < str_count_; i++) {
1091  temp_sorted_cache.push_back(i);
1092  }
1093  sortCache(temp_sorted_cache);
1094  mergeSortedCache(temp_sorted_cache);
1095 }
1096 
1097 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1098  // This method is not thread-safe.
1099 
1100  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1101  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1102 
1103  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1104  auto a_str = this->getStringFromStorage(a);
1105  auto b_str = this->getStringFromStorage(b);
1106  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1107  });
1108 }
1109 
1110 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1111  // this method is not thread safe
1112  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1113  size_t t_idx = 0, s_idx = 0, idx = 0;
1114  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1115  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1116  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1117  const auto insert_from_temp_cache =
1118  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1119  if (insert_from_temp_cache) {
1120  updated_cache[idx] = temp_sorted_cache[t_idx++];
1121  } else {
1122  updated_cache[idx] = sorted_cache[s_idx++];
1123  }
1124  }
1125  while (t_idx < temp_sorted_cache.size()) {
1126  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1127  }
1128  while (s_idx < sorted_cache.size()) {
1129  updated_cache[idx++] = sorted_cache[s_idx++];
1130  }
1131  sorted_cache.swap(updated_cache);
1132 }
1133 
1135  std::vector<int32_t>& dest_ids,
1136  StringDictionary* dest_dict,
1137  const std::vector<int32_t>& source_ids,
1138  const StringDictionary* source_dict,
1139  const std::map<int32_t, std::string> transient_mapping) {
1140  std::vector<std::string> strings;
1141 
1142  for (const int32_t source_id : source_ids) {
1143  if (source_id == std::numeric_limits<int32_t>::min()) {
1144  strings.emplace_back("");
1145  } else if (source_id < 0) {
1146  if (auto string_itr = transient_mapping.find(source_id);
1147  string_itr != transient_mapping.end()) {
1148  strings.emplace_back(string_itr->second);
1149  } else {
1150  throw std::runtime_error("Unexpected negative source ID");
1151  }
1152  } else {
1153  strings.push_back(source_dict->getString(source_id));
1154  }
1155  }
1156 
1157  dest_ids.resize(strings.size());
1158  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1159 }
1160 
1162  std::vector<std::vector<int32_t>>& dest_array_ids,
1163  StringDictionary* dest_dict,
1164  const std::vector<std::vector<int32_t>>& source_array_ids,
1165  const StringDictionary* source_dict) {
1166  dest_array_ids.resize(source_array_ids.size());
1167 
1168  std::atomic<size_t> row_idx{0};
1169  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1170  int thread_id) {
1171  for (;;) {
1172  auto row = row_idx.fetch_add(1);
1173 
1174  if (row >= dest_array_ids.size()) {
1175  return;
1176  }
1177  const auto& source_ids = source_array_ids[row];
1178  auto& dest_ids = dest_array_ids[row];
1179  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1180  }
1181  };
1182 
1183  const int num_worker_threads = std::thread::hardware_concurrency();
1184 
1185  if (source_array_ids.size() / num_worker_threads > 10) {
1186  std::vector<std::future<void>> worker_threads;
1187  for (int i = 0; i < num_worker_threads; ++i) {
1188  worker_threads.push_back(std::async(std::launch::async, processor, i));
1189  }
1190 
1191  for (auto& child : worker_threads) {
1192  child.wait();
1193  }
1194  for (auto& child : worker_threads) {
1195  child.get();
1196  }
1197  } else {
1198  processor(0);
1199  }
1200 }
1201 
1202 void translate_string_ids(std::vector<int32_t>& dest_ids,
1203  const LeafHostInfo& dict_server_host,
1204  const DictRef dest_dict_ref,
1205  const std::vector<int32_t>& source_ids,
1206  const DictRef source_dict_ref,
1207  const int32_t dest_generation) {
1208  DictRef temp_dict_ref(-1, -1);
1209  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1210  string_client.translate_string_ids(
1211  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1212 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:201
int32_t getIdOfString(const std::string &str) const
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
#define LOG(tag)
Definition: Logger.h:188
void checked_munmap(void *addr, size_t length)
std::string getStringChecked(const int string_id) const noexcept
std::string getString(int32_t string_id) const
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
#define CHECK_GE(x, y)
Definition: Logger.h:206
int64_t const int32_t sz
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::string offsets_path_
#define CHECK_GT(x, y)
Definition: Logger.h:205
int32_t getOrAddImpl(const std::string &str) noexcept
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
size_t storageEntryCount() const
void increaseCapacity() noexcept
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:40
DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
#define CHECK_NE(x, y)
Definition: Logger.h:202
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void addPayloadCapacity() noexcept
mapd_shared_mutex rw_mutex_
DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
size_t addStorageCapacity(int fd) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
int checked_open(const char *path, const bool recover)
void addOffsetCapacity() noexcept
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void getOrAddBulkArray(const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
bool checkpoint() noexcept
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:203
static char * CANARY_BUFFER
void * checked_mmap(const int fd, const size_t sz)
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
#define CHECK_LE(x, y)
Definition: Logger.h:204
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
std::shared_ptr< const std::vector< std::string > > copyStrings() const
std::unique_ptr< StringDictionaryClient > client_no_timeout_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
void getOrAddBulkRemote(const std::vector< std::string > &string_vec, T *encoded_vec)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:82
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
#define CHECK(condition)
Definition: Logger.h:193
const uint64_t round_up_p2(const uint64_t num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
std::vector< int32_t > str_ids_
void log_encoding_error(const std::string &str)
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:25
DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
std::vector< int32_t > sorted_cache
~StringDictionary() noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept