OmniSciDB  04ee39c94c
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "StringDictionary.h"
18 #include "../Shared/sqltypes.h"
19 #include "../Utils/Regexp.h"
20 #include "../Utils/StringLike.h"
21 #include "LeafHostInfo.h"
22 #include "Shared/Logger.h"
23 #include "Shared/thread_count.h"
24 #include "StringDictionaryClient.h"
25 
26 #include <sys/fcntl.h>
27 #include <boost/filesystem/operations.hpp>
28 #include <boost/filesystem/path.hpp>
29 #include <boost/sort/spreadsort/string_sort.hpp>
30 
31 #include <future>
32 #include <thread>
33 
34 namespace {
35 const int SYSTEM_PAGE_SIZE = getpagesize();
36 
37 size_t file_size(const int fd) {
38  struct stat buf;
39  int err = fstat(fd, &buf);
40  CHECK_EQ(0, err);
41  return buf.st_size;
42 }
43 
44 int checked_open(const char* path, const bool recover) {
45  auto fd = open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
46  if (fd > 0) {
47  return fd;
48  }
49  auto err = std::string("Dictionary path ") + std::string(path) +
50  std::string(" does not exist.");
51  LOG(ERROR) << err;
52  throw DictPayloadUnavailable(err);
53 }
54 
55 void* checked_mmap(const int fd, const size_t sz) {
56  auto ptr = mmap(nullptr, sz, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
57  CHECK(ptr != reinterpret_cast<void*>(-1));
58 #ifdef __linux__
59 #ifdef MADV_HUGEPAGE
60  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED | MADV_HUGEPAGE);
61 #else
62  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED);
63 #endif
64 #endif
65  return ptr;
66 }
67 void checked_munmap(void* addr, size_t length) {
68  CHECK_EQ(0, munmap(addr, length));
69 }
70 
71 const uint64_t round_up_p2(const uint64_t num) {
72  uint64_t in = num;
73  in--;
74  in |= in >> 1;
75  in |= in >> 2;
76  in |= in >> 4;
77  in |= in >> 8;
78  in |= in >> 16;
79  in++;
80  // TODO MAT deal with case where filesize has been increased but reality is
81  // we are constrained to 2^31.
82  // In that situation this calculation will wrap to zero
83  if (in == 0 || (in > (UINT32_MAX))) {
84  in = UINT32_MAX;
85  }
86  return in;
87 }
88 
89 uint32_t rk_hash(const std::string& str) {
90  uint32_t str_hash = 1;
91  // rely on fact that unsigned overflow is defined and wraps
92  for (size_t i = 0; i < str.size(); ++i) {
93  str_hash = str_hash * 997 + str[i];
94  }
95  return str_hash;
96 }
97 } // namespace
98 
99 constexpr int32_t StringDictionary::INVALID_STR_ID;
100 constexpr size_t StringDictionary::MAX_STRLEN;
101 constexpr size_t StringDictionary::MAX_STRCOUNT;
102 
103 StringDictionary::StringDictionary(const std::string& folder,
104  const bool isTemp,
105  const bool recover,
106  const bool materializeHashes,
107  size_t initial_capacity)
108  : str_count_(0)
109  , str_ids_(initial_capacity, INVALID_STR_ID)
110  , rk_hashes_(initial_capacity)
111  , isTemp_(isTemp)
112  , materialize_hashes_(materializeHashes)
113  , payload_fd_(-1)
114  , offset_fd_(-1)
115  , offset_map_(nullptr)
116  , payload_map_(nullptr)
117  , offset_file_size_(0)
118  , payload_file_size_(0)
119  , payload_file_off_(0)
120  , strings_cache_(nullptr) {
121  if (!isTemp && folder.empty()) {
122  return;
123  }
124 
125  // initial capacity must be a power of two for efficient bucket computation
126  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
127  if (!isTemp_) {
128  boost::filesystem::path storage_path(folder);
129  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
130  const auto payload_path =
131  (storage_path / boost::filesystem::path("DictPayload")).string();
132  payload_fd_ = checked_open(payload_path.c_str(), recover);
133  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
135  offset_file_size_ = file_size(offset_fd_);
136  }
137 
138  if (payload_file_size_ == 0) {
140  }
141  if (offset_file_size_ == 0) {
143  }
144  if (!isTemp_) { // we never mmap or recover temp dictionaries
145  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
146  offset_map_ =
148  if (recover) {
149  const size_t bytes = file_size(offset_fd_);
150  if (bytes % sizeof(StringIdxEntry) != 0) {
151  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
152  }
153  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
154  // at this point we know the size of the StringDict we need to load
155  // so lets reallocate the vector to the correct size
156  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
157  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
158  str_ids_.swap(new_str_ids);
159  if (materialize_hashes_) {
160  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
161  rk_hashes_.swap(new_rk_hashes);
162  }
163  unsigned string_id = 0;
164  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
165 
166  uint32_t thread_inits = 0;
167  const auto thread_count = std::thread::hardware_concurrency();
168  const uint32_t items_per_thread = std::max<uint32_t>(
169  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
170  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
171  dictionary_futures;
172  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
173  dictionary_futures.emplace_back(std::async(
174  std::launch::async, [string_id, str_count, items_per_thread, this] {
175  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
176  for (uint32_t curr_id = string_id;
177  curr_id < string_id + items_per_thread && curr_id < str_count;
178  curr_id++) {
179  const auto recovered = getStringFromStorage(curr_id);
180  if (recovered.canary) {
181  // hit the canary, recovery finished
182  break;
183  } else {
184  std::string temp(recovered.c_str_ptr, recovered.size);
185  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
186  }
187  }
188  return hashVec;
189  }));
190  thread_inits++;
191  if (thread_inits % thread_count == 0) {
192  processDictionaryFutures(dictionary_futures);
193  }
194  }
195  // gather last few threads
196  if (dictionary_futures.size() != 0) {
197  processDictionaryFutures(dictionary_futures);
198  }
199  }
200  }
201 }
202 
204  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>&
205  dictionary_futures) {
206  for (auto& dictionary_future : dictionary_futures) {
207  dictionary_future.wait();
208  auto hashVec = dictionary_future.get();
209  for (auto& hash : hashVec) {
210  uint32_t bucket = computeUniqueBucketWithHash(hash.first, str_ids_);
211  payload_file_off_ += hash.second;
212  str_ids_[bucket] = static_cast<int32_t>(str_count_);
213  if (materialize_hashes_) {
214  rk_hashes_[str_count_] = hash.first;
215  }
216  ++str_count_;
217  }
218  }
219  dictionary_futures.clear();
220 }
221 
223  : strings_cache_(nullptr)
224  , client_(new StringDictionaryClient(host, dict_ref, true))
225  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
226 
228  if (client_) {
229  return;
230  }
231  if (payload_map_) {
232  if (!isTemp_) {
236  CHECK_GE(payload_fd_, 0);
238  CHECK_GE(offset_fd_, 0);
239  close(offset_fd_);
240  } else {
242  free(payload_map_);
243  free(offset_map_);
244  }
245  }
246 }
247 
248 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
249  if (client_) {
250  std::vector<int32_t> string_ids;
251  client_->get_or_add_bulk(string_ids, {str});
252  CHECK_EQ(size_t(1), string_ids.size());
253  return string_ids.front();
254  }
255  return getOrAddImpl(str);
256 }
257 
258 namespace {
259 
260 template <class T>
261 void log_encoding_error(const std::string& str) {
262  LOG(ERROR) << "Could not encode string: " << str
263  << ", the encoded value doesn't fit in " << sizeof(T) * 8
264  << " bits. Will store NULL instead.";
265 }
266 
267 } // namespace
268 
270  const std::vector<std::vector<std::string>>& string_array_vec,
271  std::vector<std::vector<int32_t>>& ids_array_vec) {
272  ids_array_vec.resize(string_array_vec.size());
273  for (size_t i = 0; i < string_array_vec.size(); i++) {
274  auto& strings = string_array_vec[i];
275  auto& ids = ids_array_vec[i];
276  ids.resize(strings.size());
277  getOrAddBulk(strings, &ids[0]);
278  }
279 }
280 
281 template <class T>
282 void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
283  T* encoded_vec) {
284  if (client_no_timeout_) {
285  getOrAddBulkRemote(string_vec, encoded_vec);
286  return;
287  }
288  size_t out_idx{0};
289  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
290 
291  for (const auto& str : string_vec) {
292  if (str.empty()) {
293  encoded_vec[out_idx++] = inline_int_null_value<T>();
294  continue;
295  }
296  CHECK(str.size() <= MAX_STRLEN);
297  uint32_t bucket;
298  const uint32_t hash = rk_hash(str);
299  bucket = computeBucket(hash, str, str_ids_, false);
300  if (str_ids_[bucket] != INVALID_STR_ID) {
301  encoded_vec[out_idx++] = str_ids_[bucket];
302  continue;
303  }
304  // need to add record to dictionary
305  // check there is room
306  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
307  log_encoding_error<T>(str);
308  encoded_vec[out_idx++] = inline_int_null_value<T>();
309  continue;
310  }
311  if (str_ids_[bucket] == INVALID_STR_ID) {
313  << "Maximum number (" << str_count_
314  << ") of Dictionary encoded Strings reached for this column, offset path "
315  "for column is "
316  << offsets_path_;
317  if (fillRateIsHigh()) {
318  // resize when more than 50% is full
320  bucket = computeBucket(hash, str, str_ids_, false);
321  }
322  appendToStorage(str);
323 
324  str_ids_[bucket] = static_cast<int32_t>(str_count_);
325  if (materialize_hashes_) {
326  rk_hashes_[str_count_] = hash;
327  }
328  ++str_count_;
329  }
330  encoded_vec[out_idx++] = str_ids_[bucket];
331  }
333 }
334 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
335  uint8_t* encoded_vec);
336 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
337  uint16_t* encoded_vec);
338 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
339  int32_t* encoded_vec);
340 
341 template <class T>
342 void StringDictionary::getOrAddBulkRemote(const std::vector<std::string>& string_vec,
343  T* encoded_vec) {
345  std::vector<int32_t> string_ids;
346  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
347  size_t out_idx{0};
348  for (size_t i = 0; i < string_ids.size(); ++i) {
349  const auto string_id = string_ids[i];
350  const bool invalid = string_id > max_valid_int_value<T>();
351  if (invalid || string_id == inline_int_null_value<int32_t>()) {
352  if (invalid) {
353  log_encoding_error<T>(string_vec[i]);
354  }
355  encoded_vec[out_idx++] = inline_int_null_value<T>();
356  continue;
357  }
358  encoded_vec[out_idx++] = string_id;
359  }
360 }
361 
363  const std::vector<std::string>& string_vec,
364  uint8_t* encoded_vec);
366  const std::vector<std::string>& string_vec,
367  uint16_t* encoded_vec);
369  const std::vector<std::string>& string_vec,
370  int32_t* encoded_vec);
371 
372 int32_t StringDictionary::getIdOfString(const std::string& str) const {
373  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
374  if (client_) {
375  return client_->get(str);
376  }
377  return getUnlocked(str);
378 }
379 
380 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
381  const uint32_t hash = rk_hash(str);
382  auto str_id = str_ids_[computeBucket(hash, str, str_ids_, false)];
383  return str_id;
384 }
385 
386 std::string StringDictionary::getString(int32_t string_id) const {
387  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
388  if (client_) {
389  std::string ret;
390  client_->get_string(ret, string_id);
391  return ret;
392  }
393  return getStringUnlocked(string_id);
394 }
395 
396 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
397  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
398  return getStringChecked(string_id);
399 }
400 
401 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
402  noexcept {
403  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
404  CHECK(!client_);
405  CHECK_LE(0, string_id);
406  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
407  return getStringBytesChecked(string_id);
408 }
409 
411  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
412  if (client_) {
413  return client_->storage_entry_count();
414  }
415  return str_count_;
416 }
417 
418 namespace {
419 
420 bool is_like(const std::string& str,
421  const std::string& pattern,
422  const bool icase,
423  const bool is_simple,
424  const char escape) {
425  return icase
426  ? (is_simple ? string_ilike_simple(
427  str.c_str(), str.size(), pattern.c_str(), pattern.size())
428  : string_ilike(str.c_str(),
429  str.size(),
430  pattern.c_str(),
431  pattern.size(),
432  escape))
433  : (is_simple ? string_like_simple(
434  str.c_str(), str.size(), pattern.c_str(), pattern.size())
435  : string_like(str.c_str(),
436  str.size(),
437  pattern.c_str(),
438  pattern.size(),
439  escape));
440 }
441 
442 } // namespace
443 
444 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
445  const bool icase,
446  const bool is_simple,
447  const char escape,
448  const size_t generation) const {
449  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
450  if (client_) {
451  return client_->get_like(pattern, icase, is_simple, escape, generation);
452  }
453  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
454  const auto it = like_cache_.find(cache_key);
455  if (it != like_cache_.end()) {
456  return it->second;
457  }
458  std::vector<int32_t> result;
459  std::vector<std::thread> workers;
460  int worker_count = cpu_threads();
461  CHECK_GT(worker_count, 0);
462  std::vector<std::vector<int32_t>> worker_results(worker_count);
463  CHECK_LE(generation, str_count_);
464  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
465  workers.emplace_back([&worker_results,
466  &pattern,
467  generation,
468  icase,
469  is_simple,
470  escape,
471  worker_idx,
472  worker_count,
473  this]() {
474  for (size_t string_id = worker_idx; string_id < generation;
475  string_id += worker_count) {
476  const auto str = getStringUnlocked(string_id);
477  if (is_like(str, pattern, icase, is_simple, escape)) {
478  worker_results[worker_idx].push_back(string_id);
479  }
480  }
481  });
482  }
483  for (auto& worker : workers) {
484  worker.join();
485  }
486  for (const auto& worker_result : worker_results) {
487  result.insert(result.end(), worker_result.begin(), worker_result.end());
488  }
489  // place result into cache for reuse if similar query
490  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
491 
492  CHECK(it_ok.second);
493 
494  return result;
495 }
496 
497 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
498  std::string comp_operator,
499  size_t generation) {
500  std::vector<int32_t> result;
501  auto eq_id_itr = equal_cache_.find(pattern);
502  int32_t eq_id = MAX_STRLEN + 1;
503  int32_t cur_size = str_count_;
504  if (eq_id_itr != equal_cache_.end()) {
505  auto eq_id = eq_id_itr->second;
506  if (comp_operator == "=") {
507  result.push_back(eq_id);
508  } else {
509  for (int32_t idx = 0; idx <= cur_size; idx++) {
510  if (idx == eq_id) {
511  continue;
512  }
513  result.push_back(idx);
514  }
515  }
516  } else {
517  std::vector<std::thread> workers;
518  int worker_count = cpu_threads();
519  CHECK_GT(worker_count, 0);
520  std::vector<std::vector<int32_t>> worker_results(worker_count);
521  CHECK_LE(generation, str_count_);
522  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
523  workers.emplace_back(
524  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
525  for (size_t string_id = worker_idx; string_id < generation;
526  string_id += worker_count) {
527  const auto str = getStringUnlocked(string_id);
528  if (str == pattern) {
529  worker_results[worker_idx].push_back(string_id);
530  }
531  }
532  });
533  }
534  for (auto& worker : workers) {
535  worker.join();
536  }
537  for (const auto& worker_result : worker_results) {
538  result.insert(result.end(), worker_result.begin(), worker_result.end());
539  }
540  if (result.size() > 0) {
541  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
542  CHECK(it_ok.second);
543  eq_id = result[0];
544  }
545  if (comp_operator == "<>") {
546  for (int32_t idx = 0; idx <= cur_size; idx++) {
547  if (idx == eq_id) {
548  continue;
549  }
550  result.push_back(idx);
551  }
552  }
553  }
554  return result;
555 }
556 
557 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
558  const std::string& comp_operator,
559  const size_t generation) {
560  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
561  if (client_) {
562  return client_->get_compare(pattern, comp_operator, generation);
563  }
564  std::vector<int32_t> ret;
565  if (str_count_ == 0) {
566  return ret;
567  }
568  if (sorted_cache.size() < str_count_) {
569  if (comp_operator == "=" || comp_operator == "<>") {
570  return getEquals(pattern, comp_operator, generation);
571  }
572 
574  }
575  auto cache_index = compare_cache_.get(pattern);
576 
577  if (!cache_index) {
578  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
579  const auto cache_itr = std::lower_bound(
580  sorted_cache.begin(),
581  sorted_cache.end(),
582  pattern,
583  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
584  auto a_str = this->getStringFromStorage(a);
585  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
586  });
587 
588  if (cache_itr == sorted_cache.end()) {
589  cache_index->index = sorted_cache.size() - 1;
590  cache_index->diff = 1;
591  } else {
592  const auto cache_str = getStringFromStorage(*cache_itr);
593  if (!string_eq(
594  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
595  cache_index->index = cache_itr - sorted_cache.begin() - 1;
596  cache_index->diff = 1;
597  } else {
598  cache_index->index = cache_itr - sorted_cache.begin();
599  cache_index->diff = 0;
600  }
601  }
602 
603  compare_cache_.put(pattern, cache_index);
604  }
605 
606  // since we have a cache in form of vector of ints which is sorted according to
607  // corresponding strings in the dictionary all we need is the index of the element
608  // which equal to the pattern that we are trying to match or the index of “biggest”
609  // element smaller than the pattern, to perform all the comparison operators over
610  // string. The search function guarantees we have such index so now it is just the
611  // matter to include all the elements in the result vector.
612 
613  // For < operator if the index that we have points to the element which is equal to
614  // the pattern that we are searching for we simply get all the elements less than the
615  // index. If the element pointed by the index is not equal to the pattern we are
616  // comparing with we also need to include that index in result vector, except when the
617  // index points to 0 and the pattern is lesser than the smallest value in the string
618  // dictionary.
619 
620  if (comp_operator == "<") {
621  size_t idx = cache_index->index;
622  if (cache_index->diff) {
623  idx = cache_index->index + 1;
624  if (cache_index->index == 0 && cache_index->diff > 0) {
625  idx = cache_index->index;
626  }
627  }
628  for (size_t i = 0; i < idx; i++) {
629  ret.push_back(sorted_cache[i]);
630  }
631 
632  // For <= operator if the index that we have points to the element which is equal to
633  // the pattern that we are searching for we want to include the element pointed by
634  // the index in the result set. If the element pointed by the index is not equal to
635  // the pattern we are comparing with we just want to include all the ids with index
636  // less than the index that is cached, except when pattern that we are searching for
637  // is smaller than the smallest string in the dictionary.
638 
639  } else if (comp_operator == "<=") {
640  size_t idx = cache_index->index + 1;
641  if (cache_index == 0 && cache_index->diff > 0) {
642  idx = cache_index->index;
643  }
644  for (size_t i = 0; i < idx; i++) {
645  ret.push_back(sorted_cache[i]);
646  }
647 
648  // For > operator we want to get all the elements with index greater than the index
649  // that we have except, when the pattern we are searching for is lesser than the
650  // smallest string in the dictionary we also want to include the id of the index
651  // that we have.
652 
653  } else if (comp_operator == ">") {
654  size_t idx = cache_index->index + 1;
655  if (cache_index->index == 0 && cache_index->diff > 0) {
656  idx = cache_index->index;
657  }
658  for (size_t i = idx; i < sorted_cache.size(); i++) {
659  ret.push_back(sorted_cache[i]);
660  }
661 
662  // For >= operator when the indexed element that we have points to element which is
663  // equal to the pattern we are searching for we want to include that in the result
664  // vector. If the index that we have does not point to the string which is equal to
665  // the pattern we are searching we don’t want to include that id into the result
666  // vector except when the index is 0.
667 
668  } else if (comp_operator == ">=") {
669  size_t idx = cache_index->index;
670  if (cache_index->diff) {
671  idx = cache_index->index + 1;
672  if (cache_index->index == 0 && cache_index->diff > 0) {
673  idx = cache_index->index;
674  }
675  }
676  for (size_t i = idx; i < sorted_cache.size(); i++) {
677  ret.push_back(sorted_cache[i]);
678  }
679  } else if (comp_operator == "=") {
680  if (!cache_index->diff) {
681  ret.push_back(sorted_cache[cache_index->index]);
682  }
683 
684  // For <> operator it is simple matter of not including id of string which is equal
685  // to pattern we are searching for.
686  } else if (comp_operator == "<>") {
687  if (!cache_index->diff) {
688  size_t idx = cache_index->index;
689  for (size_t i = 0; i < idx; i++) {
690  ret.push_back(sorted_cache[i]);
691  }
692  ++idx;
693  for (size_t i = idx; i < sorted_cache.size(); i++) {
694  ret.push_back(sorted_cache[i]);
695  }
696  } else {
697  for (size_t i = 0; i < sorted_cache.size(); i++) {
698  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
699  }
700  }
701 
702  } else {
703  std::runtime_error("Unsupported string comparison operator");
704  }
705  return ret;
706 }
707 
708 namespace {
709 
710 bool is_regexp_like(const std::string& str,
711  const std::string& pattern,
712  const char escape) {
713  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
714 }
715 
716 } // namespace
717 
718 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
719  const char escape,
720  const size_t generation) const {
721  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
722  if (client_) {
723  return client_->get_regexp_like(pattern, escape, generation);
724  }
725  const auto cache_key = std::make_pair(pattern, escape);
726  const auto it = regex_cache_.find(cache_key);
727  if (it != regex_cache_.end()) {
728  return it->second;
729  }
730  std::vector<int32_t> result;
731  std::vector<std::thread> workers;
732  int worker_count = cpu_threads();
733  CHECK_GT(worker_count, 0);
734  std::vector<std::vector<int32_t>> worker_results(worker_count);
735  CHECK_LE(generation, str_count_);
736  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
737  workers.emplace_back([&worker_results,
738  &pattern,
739  generation,
740  escape,
741  worker_idx,
742  worker_count,
743  this]() {
744  for (size_t string_id = worker_idx; string_id < generation;
745  string_id += worker_count) {
746  const auto str = getStringUnlocked(string_id);
747  if (is_regexp_like(str, pattern, escape)) {
748  worker_results[worker_idx].push_back(string_id);
749  }
750  }
751  });
752  }
753  for (auto& worker : workers) {
754  worker.join();
755  }
756  for (const auto& worker_result : worker_results) {
757  result.insert(result.end(), worker_result.begin(), worker_result.end());
758  }
759  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
760  CHECK(it_ok.second);
761 
762  return result;
763 }
764 
765 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
766  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
767  if (client_) {
768  // TODO(miyu): support remote string dictionary
769  throw std::runtime_error(
770  "copying dictionaries from remote server is not supported yet.");
771  }
772 
773  if (strings_cache_) {
774  return strings_cache_;
775  }
776 
777  strings_cache_ = std::make_shared<std::vector<std::string>>();
778  strings_cache_->reserve(str_count_);
779  const bool multithreaded = str_count_ > 10000;
780  const auto worker_count =
781  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
782  CHECK_GT(worker_count, 0UL);
783  std::vector<std::vector<std::string>> worker_results(worker_count);
784  auto copy = [this](std::vector<std::string>& str_list,
785  const size_t start_id,
786  const size_t end_id) {
787  CHECK_LE(start_id, end_id);
788  str_list.reserve(end_id - start_id);
789  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
790  str_list.push_back(getStringUnlocked(string_id));
791  }
792  };
793  if (multithreaded) {
794  std::vector<std::future<void>> workers;
795  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
796  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
797  worker_idx < worker_count && start < str_count_;
798  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
799  workers.push_back(std::async(
800  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
801  }
802  for (auto& worker : workers) {
803  worker.get();
804  }
805  } else {
806  CHECK_EQ(worker_results.size(), size_t(1));
807  copy(worker_results[0], 0, str_count_);
808  }
809 
810  for (const auto& worker_result : worker_results) {
811  strings_cache_->insert(
812  strings_cache_->end(), worker_result.begin(), worker_result.end());
813  }
814  return strings_cache_;
815 }
816 
817 bool StringDictionary::fillRateIsHigh() const noexcept {
818  return str_ids_.size() < str_count_ * 2;
819 }
820 
822  std::vector<int32_t> new_str_ids(str_ids_.size() * 2, INVALID_STR_ID);
823 
824  if (materialize_hashes_) {
825  for (size_t i = 0; i < str_ids_.size(); ++i) {
826  if (str_ids_[i] != INVALID_STR_ID) {
827  const uint32_t hash = rk_hashes_[str_ids_[i]];
828  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
829  new_str_ids[bucket] = str_ids_[i];
830  }
831  }
832  rk_hashes_.resize(rk_hashes_.size() * 2);
833  } else {
834  for (size_t i = 0; i < str_count_; ++i) {
835  const auto str = getStringChecked(i);
836  const uint32_t hash = rk_hash(str);
837  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
838  new_str_ids[bucket] = i;
839  }
840  }
841  str_ids_.swap(new_str_ids);
842 }
843 
844 int32_t StringDictionary::getOrAddImpl(const std::string& str) noexcept {
845  // @TODO(wei) treat empty string as NULL for now
846  if (str.size() == 0) {
847  return inline_int_null_value<int32_t>();
848  }
849  CHECK(str.size() <= MAX_STRLEN);
850  uint32_t bucket;
851  const uint32_t hash = rk_hash(str);
852  {
853  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
854  bucket = computeBucket(hash, str, str_ids_, false);
855  if (str_ids_[bucket] != INVALID_STR_ID) {
856  return str_ids_[bucket];
857  }
858  }
859  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
860  // need to recalculate the bucket in case it changed before
861  // we got the lock
862  bucket = computeBucket(hash, str, str_ids_, false);
863  if (str_ids_[bucket] == INVALID_STR_ID) {
865  << "Maximum number (" << str_count_
866  << ") of Dictionary encoded Strings reached for this column, offset path "
867  "for column is "
868  << offsets_path_;
869  if (fillRateIsHigh()) {
870  // resize when more than 50% is full
872  bucket = computeBucket(hash, str, str_ids_, false);
873  }
874  appendToStorage(str);
875  str_ids_[bucket] = static_cast<int32_t>(str_count_);
876  if (materialize_hashes_) {
877  rk_hashes_[str_count_] = hash;
878  }
879  ++str_count_;
881  }
882  return str_ids_[bucket];
883 }
884 
885 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
886  const auto str_canary = getStringFromStorage(string_id);
887  CHECK(!str_canary.canary);
888  return std::string(str_canary.c_str_ptr, str_canary.size);
889 }
890 
892  const int string_id) const noexcept {
893  const auto str_canary = getStringFromStorage(string_id);
894  CHECK(!str_canary.canary);
895  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
896 }
897 
898 uint32_t StringDictionary::computeBucket(const uint32_t hash,
899  const std::string str,
900  const std::vector<int32_t>& data,
901  const bool unique) const noexcept {
902  auto bucket = hash & (data.size() - 1);
903  while (true) {
904  if (data[bucket] ==
905  INVALID_STR_ID) { // In this case it means the slot is available for use
906  break;
907  }
908  // if records are unique I don't need to do this test as I know it will not be the
909  // same
910  if (!unique) {
911  if (materialize_hashes_) {
912  if (hash == rk_hashes_[data[bucket]]) {
913  // can't be the same string if hash is different
914  const auto old_str = getStringFromStorage(data[bucket]);
915  if (str.size() == old_str.size &&
916  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
917  // found the string
918  break;
919  }
920  }
921  } else {
922  const auto old_str = getStringFromStorage(data[bucket]);
923  if (str.size() == old_str.size &&
924  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
925  // found the string
926  break;
927  }
928  }
929  }
930  // wrap around
931  if (++bucket == data.size()) {
932  bucket = 0;
933  }
934  }
935  return bucket;
936 }
937 
939  const uint32_t hash,
940  const std::vector<int32_t>& data) const noexcept {
941  auto bucket = hash & (data.size() - 1);
942  while (true) {
943  if (data[bucket] ==
944  INVALID_STR_ID) { // In this case it means the slot is available for use
945  break;
946  }
947  // wrap around
948  if (++bucket == data.size()) {
949  bucket = 0;
950  }
951  }
952  return bucket;
953 }
954 
955 void StringDictionary::appendToStorage(const std::string& str) noexcept {
956  if (!isTemp_) {
957  CHECK_GE(payload_fd_, 0);
958  CHECK_GE(offset_fd_, 0);
959  }
960  // write the payload
961  if (payload_file_off_ + str.size() > payload_file_size_) {
962  if (!isTemp_) {
965  CHECK(payload_file_off_ + str.size() <= payload_file_size_);
966  payload_map_ =
967  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
968  } else {
970  }
971  }
972  memcpy(payload_map_ + payload_file_off_, str.c_str(), str.size());
973  // write the offset and length
974  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
975  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
976  payload_file_off_ += str.size();
977  if (offset_file_off + sizeof(str_meta) >= offset_file_size_) {
978  if (!isTemp_) {
981  CHECK(offset_file_off + sizeof(str_meta) <= offset_file_size_);
982  offset_map_ =
984  } else {
986  }
987  }
988  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
989 }
990 
992  const int string_id) const noexcept {
993  if (!isTemp_) {
994  CHECK_GE(payload_fd_, 0);
995  CHECK_GE(offset_fd_, 0);
996  }
997  CHECK_GE(string_id, 0);
998  const StringIdxEntry* str_meta = offset_map_ + string_id;
999  if (str_meta->size == 0xffff) {
1000  // hit the canary
1001  return {nullptr, 0, true};
1002  }
1003  return {payload_map_ + str_meta->off, str_meta->size, false};
1004 }
1005 
1007  if (!isTemp_) {
1009  } else {
1010  payload_map_ =
1011  static_cast<char*>(addMemoryCapacity(payload_map_, payload_file_size_));
1012  }
1013 }
1014 
1016  if (!isTemp_) {
1018  } else {
1019  offset_map_ =
1021  }
1022 }
1023 
1024 size_t StringDictionary::addStorageCapacity(int fd) noexcept {
1025  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1026  if (!CANARY_BUFFER) {
1027  CANARY_BUFFER = static_cast<char*>(malloc(CANARY_BUFF_SIZE));
1029  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1030  }
1031  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1032  CHECK(write(fd, CANARY_BUFFER, CANARY_BUFF_SIZE) == CANARY_BUFF_SIZE);
1033  return CANARY_BUFF_SIZE;
1034 }
1035 
1036 void* StringDictionary::addMemoryCapacity(void* addr, size_t& mem_size) noexcept {
1037  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1038  if (!CANARY_BUFFER) {
1039  CANARY_BUFFER = reinterpret_cast<char*>(malloc(CANARY_BUFF_SIZE));
1041  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1042  }
1043  void* new_addr = realloc(addr, mem_size + CANARY_BUFF_SIZE);
1044  CHECK(new_addr);
1045  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1046  CHECK(memcpy(write_addr, CANARY_BUFFER, CANARY_BUFF_SIZE));
1047  mem_size += CANARY_BUFF_SIZE;
1048  return new_addr;
1049 }
1050 
1052  if (!like_cache_.empty()) {
1053  decltype(like_cache_)().swap(like_cache_);
1054  }
1055  if (!regex_cache_.empty()) {
1056  decltype(regex_cache_)().swap(regex_cache_);
1057  }
1058  if (!equal_cache_.empty()) {
1059  decltype(equal_cache_)().swap(equal_cache_);
1060  }
1061  compare_cache_.invalidateInvertedIndex();
1062 }
1063 
1064 char* StringDictionary::CANARY_BUFFER{nullptr};
1065 
1067  if (client_) {
1068  try {
1069  return client_->checkpoint();
1070  } catch (...) {
1071  return false;
1072  }
1073  }
1074  CHECK(!isTemp_);
1075  bool ret = true;
1076  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1077  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1078  ret = ret && (fsync(offset_fd_) == 0);
1079  ret = ret && (fsync(payload_fd_) == 0);
1080  return ret;
1081 }
1082 
1084  // This method is not thread-safe.
1085  const auto cur_cache_size = sorted_cache.size();
1086  std::vector<int32_t> temp_sorted_cache;
1087  for (size_t i = cur_cache_size; i < str_count_; i++) {
1088  temp_sorted_cache.push_back(i);
1089  }
1090  sortCache(temp_sorted_cache);
1091  mergeSortedCache(temp_sorted_cache);
1092 }
1093 
1094 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1095  // This method is not thread-safe.
1096 
1097  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1098  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1099 
1100  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1101  auto a_str = this->getStringFromStorage(a);
1102  auto b_str = this->getStringFromStorage(b);
1103  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1104  });
1105 }
1106 
1107 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1108  // this method is not thread safe
1109  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1110  size_t t_idx = 0, s_idx = 0, idx = 0;
1111  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1112  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1113  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1114  const auto insert_from_temp_cache =
1115  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1116  if (insert_from_temp_cache) {
1117  updated_cache[idx] = temp_sorted_cache[t_idx++];
1118  } else {
1119  updated_cache[idx] = sorted_cache[s_idx++];
1120  }
1121  }
1122  while (t_idx < temp_sorted_cache.size()) {
1123  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1124  }
1125  while (s_idx < sorted_cache.size()) {
1126  updated_cache[idx++] = sorted_cache[s_idx++];
1127  }
1128  sorted_cache.swap(updated_cache);
1129 }
1130 
1131 void StringDictionary::populate_string_ids(std::vector<int32_t>& dest_ids,
1132  StringDictionary* dest_dict,
1133  const std::vector<int32_t>& source_ids,
1134  const StringDictionary* source_dict) {
1135  std::vector<std::string> strings;
1136 
1137  for (const int32_t source_id : source_ids) {
1138  if (source_id == std::numeric_limits<int32_t>::min()) {
1139  strings.emplace_back("");
1140  } else if (source_id < 0) {
1141  throw std::runtime_error("Unexpected negative source ID");
1142  } else {
1143  strings.push_back(source_dict->getString(source_id));
1144  }
1145  }
1146 
1147  dest_ids.resize(strings.size());
1148  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1149 }
1150 
1152  std::vector<std::vector<int32_t>>& dest_array_ids,
1153  StringDictionary* dest_dict,
1154  const std::vector<std::vector<int32_t>>& source_array_ids,
1155  const StringDictionary* source_dict) {
1156  dest_array_ids.resize(source_array_ids.size());
1157 
1158  std::atomic<size_t> row_idx{0};
1159  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1160  int thread_id) {
1161  for (;;) {
1162  auto row = row_idx.fetch_add(1);
1163 
1164  if (row >= dest_array_ids.size()) {
1165  return;
1166  }
1167  const auto& source_ids = source_array_ids[row];
1168  auto& dest_ids = dest_array_ids[row];
1169  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1170  }
1171  };
1172 
1173  const int num_worker_threads = std::thread::hardware_concurrency();
1174 
1175  if (source_array_ids.size() / num_worker_threads > 10) {
1176  std::vector<std::future<void>> worker_threads;
1177  for (int i = 0; i < num_worker_threads; ++i) {
1178  worker_threads.push_back(std::async(std::launch::async, processor, i));
1179  }
1180 
1181  for (auto& child : worker_threads) {
1182  child.wait();
1183  }
1184  for (auto& child : worker_threads) {
1185  child.get();
1186  }
1187  } else {
1188  processor(0);
1189  }
1190 }
1191 
1192 void translate_string_ids(std::vector<int32_t>& dest_ids,
1193  const LeafHostInfo& dict_server_host,
1194  const DictRef dest_dict_ref,
1195  const std::vector<int32_t>& source_ids,
1196  const DictRef source_dict_ref,
1197  const int32_t dest_generation) {
1198  DictRef temp_dict_ref(-1, -1);
1199  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1200  string_client.translate_string_ids(
1201  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1202 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
int32_t getIdOfString(const std::string &str) const
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
#define LOG(tag)
Definition: Logger.h:182
void checked_munmap(void *addr, size_t length)
std::string getStringChecked(const int string_id) const noexcept
std::string getString(int32_t string_id) const
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
#define CHECK_GE(x, y)
Definition: Logger.h:200
int64_t const int32_t sz
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::string offsets_path_
#define CHECK_GT(x, y)
Definition: Logger.h:199
int32_t getOrAddImpl(const std::string &str) noexcept
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
size_t storageEntryCount() const
void increaseCapacity() noexcept
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:40
DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
#define CHECK_NE(x, y)
Definition: Logger.h:196
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void addPayloadCapacity() noexcept
mapd_shared_mutex rw_mutex_
DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
size_t addStorageCapacity(int fd) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
int checked_open(const char *path, const bool recover)
void addOffsetCapacity() noexcept
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void getOrAddBulkArray(const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
bool checkpoint() noexcept
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:197
static char * CANARY_BUFFER
void * checked_mmap(const int fd, const size_t sz)
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
#define CHECK_LE(x, y)
Definition: Logger.h:198
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
std::shared_ptr< const std::vector< std::string > > copyStrings() const
std::unique_ptr< StringDictionaryClient > client_no_timeout_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
void getOrAddBulkRemote(const std::vector< std::string > &string_vec, T *encoded_vec)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:83
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
void invalidateInvertedIndex() noexcept
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:121
#define CHECK(condition)
Definition: Logger.h:187
const uint64_t round_up_p2(const uint64_t num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:102
std::vector< int32_t > str_ids_
void log_encoding_error(const std::string &str)
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:23
DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
std::vector< int32_t > sorted_cache
~StringDictionary() noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept