OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "StringDictionary.h"
18 #include "../Shared/sqltypes.h"
19 #include "../Utils/Regexp.h"
20 #include "../Utils/StringLike.h"
21 #include "LeafHostInfo.h"
22 #include "Shared/Logger.h"
23 #include "Shared/thread_count.h"
24 #include "StringDictionaryClient.h"
25 
26 #include <sys/fcntl.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 
30 #include <boost/filesystem/operations.hpp>
31 #include <boost/filesystem/path.hpp>
32 #include <boost/sort/spreadsort/string_sort.hpp>
33 
34 #include <future>
35 #include <iostream>
36 #include <string_view>
37 #include <thread>
38 
39 namespace {
40 const int SYSTEM_PAGE_SIZE = getpagesize();
41 
42 size_t file_size(const int fd) {
43  struct stat buf;
44  int err = fstat(fd, &buf);
45  CHECK_EQ(0, err);
46  return buf.st_size;
47 }
48 
49 int checked_open(const char* path, const bool recover) {
50  auto fd = open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
51  if (fd > 0) {
52  return fd;
53  }
54  auto err = std::string("Dictionary path ") + std::string(path) +
55  std::string(" does not exist.");
56  LOG(ERROR) << err;
57  throw DictPayloadUnavailable(err);
58 }
59 
60 void* checked_mmap(const int fd, const size_t sz) {
61  auto ptr = mmap(nullptr, sz, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
62  CHECK(ptr != reinterpret_cast<void*>(-1));
63 #ifdef __linux__
64 #ifdef MADV_HUGEPAGE
65  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED | MADV_HUGEPAGE);
66 #else
67  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED);
68 #endif
69 #endif
70  return ptr;
71 }
72 void checked_munmap(void* addr, size_t length) {
73  CHECK_EQ(0, munmap(addr, length));
74 }
75 
76 const uint64_t round_up_p2(const uint64_t num) {
77  uint64_t in = num;
78  in--;
79  in |= in >> 1;
80  in |= in >> 2;
81  in |= in >> 4;
82  in |= in >> 8;
83  in |= in >> 16;
84  in++;
85  // TODO MAT deal with case where filesize has been increased but reality is
86  // we are constrained to 2^31.
87  // In that situation this calculation will wrap to zero
88  if (in == 0 || (in > (UINT32_MAX))) {
89  in = UINT32_MAX;
90  }
91  return in;
92 }
93 
94 uint32_t rk_hash(const std::string_view& str) {
95  uint32_t str_hash = 1;
96  // rely on fact that unsigned overflow is defined and wraps
97  for (size_t i = 0; i < str.size(); ++i) {
98  str_hash = str_hash * 997 + str[i];
99  }
100  return str_hash;
101 }
102 } // namespace
103 
105 constexpr int32_t StringDictionary::INVALID_STR_ID;
106 constexpr size_t StringDictionary::MAX_STRLEN;
107 constexpr size_t StringDictionary::MAX_STRCOUNT;
108 
109 StringDictionary::StringDictionary(const std::string& folder,
110  const bool isTemp,
111  const bool recover,
112  const bool materializeHashes,
113  size_t initial_capacity)
114  : str_count_(0)
115  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
116  , rk_hashes_(initial_capacity)
117  , isTemp_(isTemp)
118  , materialize_hashes_(materializeHashes)
119  , payload_fd_(-1)
120  , offset_fd_(-1)
121  , offset_map_(nullptr)
122  , payload_map_(nullptr)
123  , offset_file_size_(0)
124  , payload_file_size_(0)
125  , payload_file_off_(0)
126  , strings_cache_(nullptr) {
127  if (!isTemp && folder.empty()) {
128  return;
129  }
130 
131  // initial capacity must be a power of two for efficient bucket computation
132  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
133  if (!isTemp_) {
134  boost::filesystem::path storage_path(folder);
135  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
136  const auto payload_path =
137  (storage_path / boost::filesystem::path("DictPayload")).string();
138  payload_fd_ = checked_open(payload_path.c_str(), recover);
139  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
141  offset_file_size_ = file_size(offset_fd_);
142  }
143 
144  if (payload_file_size_ == 0) {
146  }
147  if (offset_file_size_ == 0) {
149  }
150  if (!isTemp_) { // we never mmap or recover temp dictionaries
151  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
152  offset_map_ =
154  if (recover) {
155  const size_t bytes = file_size(offset_fd_);
156  if (bytes % sizeof(StringIdxEntry) != 0) {
157  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
158  }
159  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
160  // at this point we know the size of the StringDict we need to load
161  // so lets reallocate the vector to the correct size
162  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
163  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
164  string_id_hash_table_.swap(new_str_ids);
165  if (materialize_hashes_) {
166  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
167  rk_hashes_.swap(new_rk_hashes);
168  }
169  unsigned string_id = 0;
170  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
171 
172  uint32_t thread_inits = 0;
173  const auto thread_count = std::thread::hardware_concurrency();
174  const uint32_t items_per_thread = std::max<uint32_t>(
175  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
176  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
177  dictionary_futures;
178  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
179  dictionary_futures.emplace_back(std::async(
180  std::launch::async, [string_id, str_count, items_per_thread, this] {
181  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
182  for (uint32_t curr_id = string_id;
183  curr_id < string_id + items_per_thread && curr_id < str_count;
184  curr_id++) {
185  const auto recovered = getStringFromStorage(curr_id);
186  if (recovered.canary) {
187  // hit the canary, recovery finished
188  break;
189  } else {
190  std::string temp(recovered.c_str_ptr, recovered.size);
191  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
192  }
193  }
194  return hashVec;
195  }));
196  thread_inits++;
197  if (thread_inits % thread_count == 0) {
198  processDictionaryFutures(dictionary_futures);
199  }
200  }
201  // gather last few threads
202  if (dictionary_futures.size() != 0) {
203  processDictionaryFutures(dictionary_futures);
204  }
205  }
206  }
207 }
208 
210  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>&
211  dictionary_futures) {
212  for (auto& dictionary_future : dictionary_futures) {
213  dictionary_future.wait();
214  auto hashVec = dictionary_future.get();
215  for (auto& hash : hashVec) {
216  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
217  payload_file_off_ += hash.second;
218  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
219  if (materialize_hashes_) {
220  rk_hashes_[str_count_] = hash.first;
221  }
222  ++str_count_;
223  }
224  }
225  dictionary_futures.clear();
226 }
227 
229  : strings_cache_(nullptr)
230  , client_(new StringDictionaryClient(host, dict_ref, true))
231  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
232 
234  free(CANARY_BUFFER);
235  if (client_) {
236  return;
237  }
238  if (payload_map_) {
239  if (!isTemp_) {
243  CHECK_GE(payload_fd_, 0);
245  CHECK_GE(offset_fd_, 0);
246  close(offset_fd_);
247  } else {
249  free(payload_map_);
250  free(offset_map_);
251  }
252  }
253 }
254 
255 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
256  if (client_) {
257  std::vector<int32_t> string_ids;
258  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
259  CHECK_EQ(size_t(1), string_ids.size());
260  return string_ids.front();
261  }
262  return getOrAddImpl(str);
263 }
264 
265 namespace {
266 
267 template <class T>
268 void log_encoding_error(std::string_view str) {
269  LOG(ERROR) << "Could not encode string: " << str
270  << ", the encoded value doesn't fit in " << sizeof(T) * 8
271  << " bits. Will store NULL instead.";
272 }
273 
274 } // namespace
275 
276 template <class String>
278  const std::vector<std::vector<String>>& string_array_vec,
279  std::vector<std::vector<int32_t>>& ids_array_vec) {
280  ids_array_vec.resize(string_array_vec.size());
281  for (size_t i = 0; i < string_array_vec.size(); i++) {
282  auto& strings = string_array_vec[i];
283  auto& ids = ids_array_vec[i];
284  ids.resize(strings.size());
285  getOrAddBulk(strings, &ids[0]);
286  }
287 }
288 
290  const std::vector<std::vector<std::string>>& string_array_vec,
291  std::vector<std::vector<int32_t>>& ids_array_vec);
292 
298 template <class String>
299 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
300  std::vector<uint32_t>& hashes) const noexcept {
301  CHECK_EQ(string_vec.size(), hashes.size());
302  const size_t min_target_strings_per_thread{2000};
303  const size_t str_count = string_vec.size();
304  const size_t max_thread_count = std::thread::hardware_concurrency();
305  const size_t items_per_thread =
306  std::max<size_t>(min_target_strings_per_thread, str_count / max_thread_count + 1);
307 
308  std::vector<std::thread> workers;
309  for (size_t string_id = 0; string_id < str_count; string_id += items_per_thread) {
310  workers.emplace_back(
311  [&string_vec, &hashes, string_id, str_count, items_per_thread]() {
312  const size_t end_id = std::min(string_id + items_per_thread, str_count);
313  for (size_t curr_id = string_id; curr_id < end_id; ++curr_id) {
314  if (string_vec[curr_id].empty()) {
315  continue;
316  }
317  hashes[curr_id] = rk_hash(string_vec[curr_id]);
318  }
319  });
320  }
321  for (auto& worker : workers) {
322  worker.join();
323  }
324 }
325 
326 template <class T, class String>
327 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
328  T* output_string_ids) {
330  getOrAddBulkParallel(input_strings, output_string_ids);
331  return;
332  }
333  // Single-thread path.
334  if (client_no_timeout_) {
335  getOrAddBulkRemote(input_strings, output_string_ids);
336  return;
337  }
338  size_t out_idx{0};
339  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
340 
341  for (const auto& str : input_strings) {
342  if (str.empty()) {
343  output_string_ids[out_idx++] = inline_int_null_value<T>();
344  continue;
345  }
346  CHECK(str.size() <= MAX_STRLEN);
347  uint32_t bucket;
348  const uint32_t hash = rk_hash(str);
349  bucket = computeBucket(hash, str, string_id_hash_table_);
350  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
351  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
352  continue;
353  }
354  // need to add record to dictionary
355  // check there is room
356  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
357  log_encoding_error<T>(str);
358  output_string_ids[out_idx++] = inline_int_null_value<T>();
359  continue;
360  }
361  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
363  << "Maximum number (" << str_count_
364  << ") of Dictionary encoded Strings reached for this column, offset path "
365  "for column is "
366  << offsets_path_;
367  if (fillRateIsHigh(str_count_)) {
368  // resize when more than 50% is full
370  bucket = computeBucket(hash, str, string_id_hash_table_);
371  }
372  appendToStorage(str);
373 
374  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
375  if (materialize_hashes_) {
376  rk_hashes_[str_count_] = hash;
377  }
378  ++str_count_;
379  }
380  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
381  }
383 }
384 
385 template <class T, class String>
386 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
387  T* output_string_ids) {
388  if (client_no_timeout_) {
389  getOrAddBulkRemote(input_strings, output_string_ids);
390  return;
391  }
392  // Run rk_hash on the input strings up front, and in parallel,
393  // as the string hashing does not need to be behind the subsequent write_lock
394  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
395  hashStrings(input_strings, input_strings_rk_hashes);
396 
397  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
398  size_t shadow_str_count =
399  str_count_; // Need to shadow str_count_ now with bulk add methods
400  const size_t storage_high_water_mark = shadow_str_count;
401  std::vector<size_t> string_memory_ids;
402  size_t sum_new_string_lengths = 0;
403  string_memory_ids.reserve(input_strings.size());
404  size_t input_string_idx{0};
405  for (const auto& input_string : input_strings) {
406  // Currently we make empty strings null
407  if (input_string.empty()) {
408  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
409  continue;
410  }
411  // TODO: Recover gracefully if an input string is too long
412  CHECK(input_string.size() <= MAX_STRLEN);
413 
414  if (fillRateIsHigh(shadow_str_count)) {
415  // resize when more than 50% is full
416  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
417  input_strings,
418  string_memory_ids,
419  input_strings_rk_hashes);
420  }
421  // Get the rk_hash for this input_string
422  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
423 
424  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
425  input_string,
427  storage_high_water_mark,
428  input_strings,
429  string_memory_ids);
430 
431  // If the hash bucket is not empty, that is our string id
432  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
433  // bucket string are equal)
434  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
435  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
436  continue;
437  }
438  // Did not find string, so need to add record to dictionary
439  // First check there is room
440  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
441  log_encoding_error<T>(input_string);
442  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
443  continue;
444  }
445  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
446  << "Maximum number (" << shadow_str_count
447  << ") of Dictionary encoded Strings reached for this column, offset path "
448  "for column is "
449  << offsets_path_;
450 
451  string_memory_ids.push_back(input_string_idx);
452  sum_new_string_lengths += input_string.size();
453  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
454  if (materialize_hashes_) {
455  rk_hashes_[shadow_str_count] = input_string_rk_hash;
456  }
457  output_string_ids[input_string_idx++] = shadow_str_count++;
458  }
459  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
460  str_count_ = shadow_str_count;
461 
463 }
464 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
465  uint8_t* encoded_vec);
466 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
467  uint16_t* encoded_vec);
468 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
469  int32_t* encoded_vec);
470 
471 template void StringDictionary::getOrAddBulk(
472  const std::vector<std::string_view>& string_vec,
473  uint8_t* encoded_vec);
474 template void StringDictionary::getOrAddBulk(
475  const std::vector<std::string_view>& string_vec,
476  uint16_t* encoded_vec);
477 template void StringDictionary::getOrAddBulk(
478  const std::vector<std::string_view>& string_vec,
479  int32_t* encoded_vec);
480 
481 template <class T, class String>
482 void StringDictionary::getOrAddBulkRemote(const std::vector<String>& string_vec,
483  T* encoded_vec) {
485  std::vector<int32_t> string_ids;
486  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
487  size_t out_idx{0};
488  for (size_t i = 0; i < string_ids.size(); ++i) {
489  const auto string_id = string_ids[i];
490  const bool invalid = string_id > max_valid_int_value<T>();
491  if (invalid || string_id == inline_int_null_value<int32_t>()) {
492  if (invalid) {
493  log_encoding_error<T>(string_vec[i]);
494  }
495  encoded_vec[out_idx++] = inline_int_null_value<T>();
496  continue;
497  }
498  encoded_vec[out_idx++] = string_id;
499  }
500 }
501 
503  const std::vector<std::string>& string_vec,
504  uint8_t* encoded_vec);
506  const std::vector<std::string>& string_vec,
507  uint16_t* encoded_vec);
509  const std::vector<std::string>& string_vec,
510  int32_t* encoded_vec);
511 
513  const std::vector<std::string_view>& string_vec,
514  uint8_t* encoded_vec);
516  const std::vector<std::string_view>& string_vec,
517  uint16_t* encoded_vec);
519  const std::vector<std::string_view>& string_vec,
520  int32_t* encoded_vec);
521 
522 int32_t StringDictionary::getIdOfString(const std::string& str) const {
523  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
524  if (client_) {
525  return client_->get(str);
526  }
527  return getUnlocked(str);
528 }
529 
530 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
531  const uint32_t hash = rk_hash(str);
532  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
533  return str_id;
534 }
535 
536 std::string StringDictionary::getString(int32_t string_id) const {
537  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
538  if (client_) {
539  std::string ret;
540  client_->get_string(ret, string_id);
541  return ret;
542  }
543  return getStringUnlocked(string_id);
544 }
545 
546 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
547  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
548  return getStringChecked(string_id);
549 }
550 
551 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
552  noexcept {
553  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
554  CHECK(!client_);
555  CHECK_LE(0, string_id);
556  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
557  return getStringBytesChecked(string_id);
558 }
559 
561  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
562  if (client_) {
563  return client_->storage_entry_count();
564  }
565  return str_count_;
566 }
567 
568 namespace {
569 
570 bool is_like(const std::string& str,
571  const std::string& pattern,
572  const bool icase,
573  const bool is_simple,
574  const char escape) {
575  return icase
576  ? (is_simple ? string_ilike_simple(
577  str.c_str(), str.size(), pattern.c_str(), pattern.size())
578  : string_ilike(str.c_str(),
579  str.size(),
580  pattern.c_str(),
581  pattern.size(),
582  escape))
583  : (is_simple ? string_like_simple(
584  str.c_str(), str.size(), pattern.c_str(), pattern.size())
585  : string_like(str.c_str(),
586  str.size(),
587  pattern.c_str(),
588  pattern.size(),
589  escape));
590 }
591 
592 } // namespace
593 
594 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
595  const bool icase,
596  const bool is_simple,
597  const char escape,
598  const size_t generation) const {
599  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
600  if (client_) {
601  return client_->get_like(pattern, icase, is_simple, escape, generation);
602  }
603  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
604  const auto it = like_cache_.find(cache_key);
605  if (it != like_cache_.end()) {
606  return it->second;
607  }
608  std::vector<int32_t> result;
609  std::vector<std::thread> workers;
610  int worker_count = cpu_threads();
611  CHECK_GT(worker_count, 0);
612  std::vector<std::vector<int32_t>> worker_results(worker_count);
613  CHECK_LE(generation, str_count_);
614  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
615  workers.emplace_back([&worker_results,
616  &pattern,
617  generation,
618  icase,
619  is_simple,
620  escape,
621  worker_idx,
622  worker_count,
623  this]() {
624  for (size_t string_id = worker_idx; string_id < generation;
625  string_id += worker_count) {
626  const auto str = getStringUnlocked(string_id);
627  if (is_like(str, pattern, icase, is_simple, escape)) {
628  worker_results[worker_idx].push_back(string_id);
629  }
630  }
631  });
632  }
633  for (auto& worker : workers) {
634  worker.join();
635  }
636  for (const auto& worker_result : worker_results) {
637  result.insert(result.end(), worker_result.begin(), worker_result.end());
638  }
639  // place result into cache for reuse if similar query
640  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
641 
642  CHECK(it_ok.second);
643 
644  return result;
645 }
646 
647 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
648  std::string comp_operator,
649  size_t generation) {
650  std::vector<int32_t> result;
651  auto eq_id_itr = equal_cache_.find(pattern);
652  int32_t eq_id = MAX_STRLEN + 1;
653  int32_t cur_size = str_count_;
654  if (eq_id_itr != equal_cache_.end()) {
655  auto eq_id = eq_id_itr->second;
656  if (comp_operator == "=") {
657  result.push_back(eq_id);
658  } else {
659  for (int32_t idx = 0; idx <= cur_size; idx++) {
660  if (idx == eq_id) {
661  continue;
662  }
663  result.push_back(idx);
664  }
665  }
666  } else {
667  std::vector<std::thread> workers;
668  int worker_count = cpu_threads();
669  CHECK_GT(worker_count, 0);
670  std::vector<std::vector<int32_t>> worker_results(worker_count);
671  CHECK_LE(generation, str_count_);
672  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
673  workers.emplace_back(
674  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
675  for (size_t string_id = worker_idx; string_id < generation;
676  string_id += worker_count) {
677  const auto str = getStringUnlocked(string_id);
678  if (str == pattern) {
679  worker_results[worker_idx].push_back(string_id);
680  }
681  }
682  });
683  }
684  for (auto& worker : workers) {
685  worker.join();
686  }
687  for (const auto& worker_result : worker_results) {
688  result.insert(result.end(), worker_result.begin(), worker_result.end());
689  }
690  if (result.size() > 0) {
691  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
692  CHECK(it_ok.second);
693  eq_id = result[0];
694  }
695  if (comp_operator == "<>") {
696  for (int32_t idx = 0; idx <= cur_size; idx++) {
697  if (idx == eq_id) {
698  continue;
699  }
700  result.push_back(idx);
701  }
702  }
703  }
704  return result;
705 }
706 
707 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
708  const std::string& comp_operator,
709  const size_t generation) {
710  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
711  if (client_) {
712  return client_->get_compare(pattern, comp_operator, generation);
713  }
714  std::vector<int32_t> ret;
715  if (str_count_ == 0) {
716  return ret;
717  }
718  if (sorted_cache.size() < str_count_) {
719  if (comp_operator == "=" || comp_operator == "<>") {
720  return getEquals(pattern, comp_operator, generation);
721  }
722 
724  }
725  auto cache_index = compare_cache_.get(pattern);
726 
727  if (!cache_index) {
728  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
729  const auto cache_itr = std::lower_bound(
730  sorted_cache.begin(),
731  sorted_cache.end(),
732  pattern,
733  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
734  auto a_str = this->getStringFromStorage(a);
735  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
736  });
737 
738  if (cache_itr == sorted_cache.end()) {
739  cache_index->index = sorted_cache.size() - 1;
740  cache_index->diff = 1;
741  } else {
742  const auto cache_str = getStringFromStorage(*cache_itr);
743  if (!string_eq(
744  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
745  cache_index->index = cache_itr - sorted_cache.begin() - 1;
746  cache_index->diff = 1;
747  } else {
748  cache_index->index = cache_itr - sorted_cache.begin();
749  cache_index->diff = 0;
750  }
751  }
752 
753  compare_cache_.put(pattern, cache_index);
754  }
755 
756  // since we have a cache in form of vector of ints which is sorted according to
757  // corresponding strings in the dictionary all we need is the index of the element
758  // which equal to the pattern that we are trying to match or the index of “biggest”
759  // element smaller than the pattern, to perform all the comparison operators over
760  // string. The search function guarantees we have such index so now it is just the
761  // matter to include all the elements in the result vector.
762 
763  // For < operator if the index that we have points to the element which is equal to
764  // the pattern that we are searching for we simply get all the elements less than the
765  // index. If the element pointed by the index is not equal to the pattern we are
766  // comparing with we also need to include that index in result vector, except when the
767  // index points to 0 and the pattern is lesser than the smallest value in the string
768  // dictionary.
769 
770  if (comp_operator == "<") {
771  size_t idx = cache_index->index;
772  if (cache_index->diff) {
773  idx = cache_index->index + 1;
774  if (cache_index->index == 0 && cache_index->diff > 0) {
775  idx = cache_index->index;
776  }
777  }
778  for (size_t i = 0; i < idx; i++) {
779  ret.push_back(sorted_cache[i]);
780  }
781 
782  // For <= operator if the index that we have points to the element which is equal to
783  // the pattern that we are searching for we want to include the element pointed by
784  // the index in the result set. If the element pointed by the index is not equal to
785  // the pattern we are comparing with we just want to include all the ids with index
786  // less than the index that is cached, except when pattern that we are searching for
787  // is smaller than the smallest string in the dictionary.
788 
789  } else if (comp_operator == "<=") {
790  size_t idx = cache_index->index + 1;
791  if (cache_index == 0 && cache_index->diff > 0) {
792  idx = cache_index->index;
793  }
794  for (size_t i = 0; i < idx; i++) {
795  ret.push_back(sorted_cache[i]);
796  }
797 
798  // For > operator we want to get all the elements with index greater than the index
799  // that we have except, when the pattern we are searching for is lesser than the
800  // smallest string in the dictionary we also want to include the id of the index
801  // that we have.
802 
803  } else if (comp_operator == ">") {
804  size_t idx = cache_index->index + 1;
805  if (cache_index->index == 0 && cache_index->diff > 0) {
806  idx = cache_index->index;
807  }
808  for (size_t i = idx; i < sorted_cache.size(); i++) {
809  ret.push_back(sorted_cache[i]);
810  }
811 
812  // For >= operator when the indexed element that we have points to element which is
813  // equal to the pattern we are searching for we want to include that in the result
814  // vector. If the index that we have does not point to the string which is equal to
815  // the pattern we are searching we don’t want to include that id into the result
816  // vector except when the index is 0.
817 
818  } else if (comp_operator == ">=") {
819  size_t idx = cache_index->index;
820  if (cache_index->diff) {
821  idx = cache_index->index + 1;
822  if (cache_index->index == 0 && cache_index->diff > 0) {
823  idx = cache_index->index;
824  }
825  }
826  for (size_t i = idx; i < sorted_cache.size(); i++) {
827  ret.push_back(sorted_cache[i]);
828  }
829  } else if (comp_operator == "=") {
830  if (!cache_index->diff) {
831  ret.push_back(sorted_cache[cache_index->index]);
832  }
833 
834  // For <> operator it is simple matter of not including id of string which is equal
835  // to pattern we are searching for.
836  } else if (comp_operator == "<>") {
837  if (!cache_index->diff) {
838  size_t idx = cache_index->index;
839  for (size_t i = 0; i < idx; i++) {
840  ret.push_back(sorted_cache[i]);
841  }
842  ++idx;
843  for (size_t i = idx; i < sorted_cache.size(); i++) {
844  ret.push_back(sorted_cache[i]);
845  }
846  } else {
847  for (size_t i = 0; i < sorted_cache.size(); i++) {
848  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
849  }
850  }
851 
852  } else {
853  std::runtime_error("Unsupported string comparison operator");
854  }
855  return ret;
856 }
857 
858 namespace {
859 
860 bool is_regexp_like(const std::string& str,
861  const std::string& pattern,
862  const char escape) {
863  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
864 }
865 
866 } // namespace
867 
868 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
869  const char escape,
870  const size_t generation) const {
871  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
872  if (client_) {
873  return client_->get_regexp_like(pattern, escape, generation);
874  }
875  const auto cache_key = std::make_pair(pattern, escape);
876  const auto it = regex_cache_.find(cache_key);
877  if (it != regex_cache_.end()) {
878  return it->second;
879  }
880  std::vector<int32_t> result;
881  std::vector<std::thread> workers;
882  int worker_count = cpu_threads();
883  CHECK_GT(worker_count, 0);
884  std::vector<std::vector<int32_t>> worker_results(worker_count);
885  CHECK_LE(generation, str_count_);
886  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
887  workers.emplace_back([&worker_results,
888  &pattern,
889  generation,
890  escape,
891  worker_idx,
892  worker_count,
893  this]() {
894  for (size_t string_id = worker_idx; string_id < generation;
895  string_id += worker_count) {
896  const auto str = getStringUnlocked(string_id);
897  if (is_regexp_like(str, pattern, escape)) {
898  worker_results[worker_idx].push_back(string_id);
899  }
900  }
901  });
902  }
903  for (auto& worker : workers) {
904  worker.join();
905  }
906  for (const auto& worker_result : worker_results) {
907  result.insert(result.end(), worker_result.begin(), worker_result.end());
908  }
909  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
910  CHECK(it_ok.second);
911 
912  return result;
913 }
914 
915 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
916  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
917  if (client_) {
918  // TODO(miyu): support remote string dictionary
919  throw std::runtime_error(
920  "copying dictionaries from remote server is not supported yet.");
921  }
922 
923  if (strings_cache_) {
924  return strings_cache_;
925  }
926 
927  strings_cache_ = std::make_shared<std::vector<std::string>>();
928  strings_cache_->reserve(str_count_);
929  const bool multithreaded = str_count_ > 10000;
930  const auto worker_count =
931  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
932  CHECK_GT(worker_count, 0UL);
933  std::vector<std::vector<std::string>> worker_results(worker_count);
934  auto copy = [this](std::vector<std::string>& str_list,
935  const size_t start_id,
936  const size_t end_id) {
937  CHECK_LE(start_id, end_id);
938  str_list.reserve(end_id - start_id);
939  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
940  str_list.push_back(getStringUnlocked(string_id));
941  }
942  };
943  if (multithreaded) {
944  std::vector<std::future<void>> workers;
945  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
946  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
947  worker_idx < worker_count && start < str_count_;
948  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
949  workers.push_back(std::async(
950  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
951  }
952  for (auto& worker : workers) {
953  worker.get();
954  }
955  } else {
956  CHECK_EQ(worker_results.size(), size_t(1));
957  copy(worker_results[0], 0, str_count_);
958  }
959 
960  for (const auto& worker_result : worker_results) {
961  strings_cache_->insert(
962  strings_cache_->end(), worker_result.begin(), worker_result.end());
963  }
964  return strings_cache_;
965 }
966 
967 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
968  return string_id_hash_table_.size() <= num_strings * 2;
969 }
970 
972  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
973 
974  if (materialize_hashes_) {
975  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
977  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
978  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
979  new_str_ids[bucket] = string_id_hash_table_[i];
980  }
981  }
982  rk_hashes_.resize(rk_hashes_.size() * 2);
983  } else {
984  for (size_t i = 0; i < str_count_; ++i) {
985  const auto str = getStringChecked(i);
986  const uint32_t hash = rk_hash(str);
987  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
988  new_str_ids[bucket] = i;
989  }
990  }
991  string_id_hash_table_.swap(new_str_ids);
992 }
993 
994 template <class String>
996  const size_t storage_high_water_mark,
997  const std::vector<String>& input_strings,
998  const std::vector<size_t>& string_memory_ids,
999  const std::vector<uint32_t>& input_strings_rk_hashes) noexcept {
1000  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1001  if (materialize_hashes_) {
1002  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1003  if (string_id_hash_table_[i] != INVALID_STR_ID) {
1004  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1005  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1006  new_str_ids[bucket] = string_id_hash_table_[i];
1007  }
1008  }
1009  rk_hashes_.resize(rk_hashes_.size() * 2);
1010  } else {
1011  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1012  const auto storage_string = getStringChecked(storage_idx);
1013  const uint32_t hash = rk_hash(storage_string);
1014  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1015  new_str_ids[bucket] = storage_idx;
1016  }
1017  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1018  size_t string_memory_id = string_memory_ids[memory_idx];
1019  uint32_t bucket = computeUniqueBucketWithHash(
1020  input_strings_rk_hashes[string_memory_id], new_str_ids);
1021  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1022  }
1023  }
1024  string_id_hash_table_.swap(new_str_ids);
1025 }
1026 
1027 int32_t StringDictionary::getOrAddImpl(const std::string& str) noexcept {
1028  // @TODO(wei) treat empty string as NULL for now
1029  if (str.size() == 0) {
1030  return inline_int_null_value<int32_t>();
1031  }
1032  CHECK(str.size() <= MAX_STRLEN);
1033  uint32_t bucket;
1034  const uint32_t hash = rk_hash(str);
1035  {
1036  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1037  bucket = computeBucket(hash, str, string_id_hash_table_);
1038  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1039  return string_id_hash_table_[bucket];
1040  }
1041  }
1042  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1043  // need to recalculate the bucket in case it changed before
1044  // we got the lock
1045  bucket = computeBucket(hash, str, string_id_hash_table_);
1046  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1047  CHECK_LT(str_count_, MAX_STRCOUNT)
1048  << "Maximum number (" << str_count_
1049  << ") of Dictionary encoded Strings reached for this column, offset path "
1050  "for column is "
1051  << offsets_path_;
1052  if (fillRateIsHigh(str_count_)) {
1053  // resize when more than 50% is full
1054  increaseCapacity();
1055  bucket = computeBucket(hash, str, string_id_hash_table_);
1056  }
1057  appendToStorage(str);
1058  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1059  if (materialize_hashes_) {
1060  rk_hashes_[str_count_] = hash;
1061  }
1062  ++str_count_;
1063  invalidateInvertedIndex();
1064  }
1065  return string_id_hash_table_[bucket];
1066 }
1067 
1068 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1069  const auto str_canary = getStringFromStorage(string_id);
1070  CHECK(!str_canary.canary);
1071  return std::string(str_canary.c_str_ptr, str_canary.size);
1072 }
1073 
1075  const int string_id) const noexcept {
1076  const auto str_canary = getStringFromStorage(string_id);
1077  CHECK(!str_canary.canary);
1078  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1079 }
1080 
1081 template <class String>
1082 uint32_t StringDictionary::computeBucket(const uint32_t hash,
1083  const String& str,
1084  const std::vector<int32_t>& data) const
1085  noexcept {
1086  auto bucket = hash & (data.size() - 1);
1087  while (true) {
1088  const int32_t candidate_string_id = data[bucket];
1089  if (candidate_string_id ==
1090  INVALID_STR_ID) { // In this case it means the slot is available for use
1091  break;
1092  }
1093  if (materialize_hashes_ && hash != rk_hashes_[candidate_string_id]) {
1094  continue;
1095  }
1096  const auto old_str = getStringFromStorageFast(candidate_string_id);
1097  if (str.size() == old_str.size() && !memcmp(str.data(), old_str.data(), str.size())) {
1098  // found the string
1099  break;
1100  }
1101  // wrap around
1102  if (++bucket == data.size()) {
1103  bucket = 0;
1104  }
1105  }
1106  return bucket;
1107 }
1108 
1109 template <class String>
1111  const uint32_t input_string_rk_hash,
1112  const String& input_string,
1113  const std::vector<int32_t>& string_id_hash_table,
1114  const size_t storage_high_water_mark,
1115  const std::vector<String>& input_strings,
1116  const std::vector<size_t>& string_memory_ids) const noexcept {
1117  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1118  while (true) {
1119  const int32_t candidate_string_id = string_id_hash_table[bucket];
1120  if (candidate_string_id ==
1121  INVALID_STR_ID) { // In this case it means the slot is available for use
1122  break;
1123  }
1124  if (!materialize_hashes_ ||
1125  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1126  if (candidate_string_id > 0 &&
1127  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1128  // The candidate string is not in storage yet but in our string_memory_ids temp
1129  // buffer
1130  size_t memory_offset =
1131  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1132  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1133  if (input_string.size() == candidate_string.size() &&
1134  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1135  // found the string in the temp memory buffer
1136  break;
1137  }
1138  } else {
1139  // The candidate string is in storage, need to fetch it for comparison
1140  const auto candidate_storage_string =
1141  getStringFromStorageFast(candidate_string_id);
1142  if (input_string.size() == candidate_storage_string.size() &&
1143  !memcmp(input_string.data(),
1144  candidate_storage_string.data(),
1145  input_string.size())) {
1148  // found the string in storage
1149  break;
1150  }
1151  }
1152  }
1153  if (++bucket == string_id_hash_table.size()) {
1154  bucket = 0;
1155  }
1156  }
1157  return bucket;
1158 }
1159 
1161  const uint32_t hash,
1162  const std::vector<int32_t>& data) const noexcept {
1163  auto bucket = hash & (data.size() - 1);
1164  while (true) {
1165  if (data[bucket] ==
1166  INVALID_STR_ID) { // In this case it means the slot is available for use
1167  break;
1168  }
1169  // wrap around
1170  if (++bucket == data.size()) {
1171  bucket = 0;
1172  }
1173  }
1174  return bucket;
1175 }
1176 
1178  const size_t write_length) {
1179  if (payload_file_off_ + write_length > payload_file_size_) {
1180  const size_t min_capacity_needed =
1181  write_length - (payload_file_size_ - payload_file_off_);
1182  if (!isTemp_) {
1183  CHECK_GE(payload_fd_, 0);
1185  addPayloadCapacity(min_capacity_needed);
1186  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1187  payload_map_ =
1188  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
1189  } else {
1190  addPayloadCapacity(min_capacity_needed);
1191  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1192  }
1193  }
1194 }
1195 
1197  const size_t write_length) {
1198  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1199  if (offset_file_off + write_length >= offset_file_size_) {
1200  const size_t min_capacity_needed =
1201  write_length - (offset_file_size_ - offset_file_off);
1202  if (!isTemp_) {
1203  CHECK_GE(offset_fd_, 0);
1205  addOffsetCapacity(min_capacity_needed);
1206  CHECK(offset_file_off + write_length <= offset_file_size_);
1207  offset_map_ =
1209  } else {
1210  addOffsetCapacity(min_capacity_needed);
1211  CHECK(offset_file_off + write_length <= offset_file_size_);
1212  }
1213  }
1214 }
1215 
1216 template <class String>
1217 void StringDictionary::appendToStorage(String str) noexcept {
1218  // write the payload
1219  checkAndConditionallyIncreasePayloadCapacity(str.size());
1220  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1221 
1222  // write the offset and length
1223  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1224  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1225 
1226  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1227  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1228 }
1229 
1230 template <class String>
1232  const std::vector<String>& input_strings,
1233  const std::vector<size_t>& string_memory_ids,
1234  const size_t sum_new_strings_lengths) noexcept {
1235  const size_t num_strings = string_memory_ids.size();
1236 
1237  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1238  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1239 
1240  for (size_t i = 0; i < num_strings; ++i) {
1241  const size_t string_idx = string_memory_ids[i];
1242  const String str = input_strings[string_idx];
1243  const size_t str_size(str.size());
1244  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1245  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1246  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1247  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1248  }
1249 }
1250 
1251 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1252  noexcept {
1253  const StringIdxEntry* str_meta = offset_map_ + string_id;
1254  return {payload_map_ + str_meta->off, str_meta->size};
1255 }
1256 
1258  const int string_id) const noexcept {
1259  if (!isTemp_) {
1260  CHECK_GE(payload_fd_, 0);
1261  CHECK_GE(offset_fd_, 0);
1262  }
1263  CHECK_GE(string_id, 0);
1264  const StringIdxEntry* str_meta = offset_map_ + string_id;
1265  if (str_meta->size == 0xffff) {
1266  // hit the canary
1267  return {nullptr, 0, true};
1268  }
1269  return {payload_map_ + str_meta->off, str_meta->size, false};
1270 }
1271 
1272 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1273  if (!isTemp_) {
1274  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1275  } else {
1276  payload_map_ = static_cast<char*>(
1277  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1278  }
1279 }
1280 
1281 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1282  if (!isTemp_) {
1283  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1284  } else {
1285  offset_map_ = static_cast<StringIdxEntry*>(
1286  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1287  }
1288 }
1289 
1291  int fd,
1292  const size_t min_capacity_requested) noexcept {
1293  const size_t canary_buff_size_to_add =
1294  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1295  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1296 
1297  if (canary_buffer_size != canary_buff_size_to_add) {
1298  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1299  canary_buffer_size = canary_buff_size_to_add;
1300  }
1301  CHECK(CANARY_BUFFER);
1302  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1303 
1304  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1305  ssize_t write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1306  CHECK(write_return > 0 &&
1307  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1308  return canary_buff_size_to_add;
1309 }
1310 
1312  size_t& mem_size,
1313  const size_t min_capacity_requested) noexcept {
1314  const size_t canary_buff_size_to_add =
1315  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1316  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1317  if (canary_buffer_size != canary_buff_size_to_add) {
1318  CANARY_BUFFER =
1319  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1320  canary_buffer_size = canary_buff_size_to_add;
1321  }
1322  CHECK(CANARY_BUFFER);
1323  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1324  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1325  CHECK(new_addr);
1326  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1327  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1328  mem_size += canary_buff_size_to_add;
1329  return new_addr;
1330 }
1331 
1333  if (!like_cache_.empty()) {
1334  decltype(like_cache_)().swap(like_cache_);
1335  }
1336  if (!regex_cache_.empty()) {
1337  decltype(regex_cache_)().swap(regex_cache_);
1338  }
1339  if (!equal_cache_.empty()) {
1340  decltype(equal_cache_)().swap(equal_cache_);
1341  }
1342  compare_cache_.invalidateInvertedIndex();
1343 }
1344 
1346  if (client_) {
1347  try {
1348  return client_->checkpoint();
1349  } catch (...) {
1350  return false;
1351  }
1352  }
1353  CHECK(!isTemp_);
1354  bool ret = true;
1355  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1356  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1357  ret = ret && (fsync(offset_fd_) == 0);
1358  ret = ret && (fsync(payload_fd_) == 0);
1359  return ret;
1360 }
1361 
1363  // This method is not thread-safe.
1364  const auto cur_cache_size = sorted_cache.size();
1365  std::vector<int32_t> temp_sorted_cache;
1366  for (size_t i = cur_cache_size; i < str_count_; i++) {
1367  temp_sorted_cache.push_back(i);
1368  }
1369  sortCache(temp_sorted_cache);
1370  mergeSortedCache(temp_sorted_cache);
1371 }
1372 
1373 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1374  // This method is not thread-safe.
1375 
1376  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1377  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1378 
1379  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1380  auto a_str = this->getStringFromStorage(a);
1381  auto b_str = this->getStringFromStorage(b);
1382  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1383  });
1384 }
1385 
1386 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1387  // this method is not thread safe
1388  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1389  size_t t_idx = 0, s_idx = 0, idx = 0;
1390  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1391  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1392  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1393  const auto insert_from_temp_cache =
1394  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1395  if (insert_from_temp_cache) {
1396  updated_cache[idx] = temp_sorted_cache[t_idx++];
1397  } else {
1398  updated_cache[idx] = sorted_cache[s_idx++];
1399  }
1400  }
1401  while (t_idx < temp_sorted_cache.size()) {
1402  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1403  }
1404  while (s_idx < sorted_cache.size()) {
1405  updated_cache[idx++] = sorted_cache[s_idx++];
1406  }
1407  sorted_cache.swap(updated_cache);
1408 }
1409 
1411  std::vector<int32_t>& dest_ids,
1412  StringDictionary* dest_dict,
1413  const std::vector<int32_t>& source_ids,
1414  const StringDictionary* source_dict,
1415  const std::map<int32_t, std::string> transient_mapping) {
1416  std::vector<std::string> strings;
1417 
1418  for (const int32_t source_id : source_ids) {
1419  if (source_id == std::numeric_limits<int32_t>::min()) {
1420  strings.emplace_back("");
1421  } else if (source_id < 0) {
1422  if (auto string_itr = transient_mapping.find(source_id);
1423  string_itr != transient_mapping.end()) {
1424  strings.emplace_back(string_itr->second);
1425  } else {
1426  throw std::runtime_error("Unexpected negative source ID");
1427  }
1428  } else {
1429  strings.push_back(source_dict->getString(source_id));
1430  }
1431  }
1432 
1433  dest_ids.resize(strings.size());
1434  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1435 }
1436 
1438  std::vector<std::vector<int32_t>>& dest_array_ids,
1439  StringDictionary* dest_dict,
1440  const std::vector<std::vector<int32_t>>& source_array_ids,
1441  const StringDictionary* source_dict) {
1442  dest_array_ids.resize(source_array_ids.size());
1443 
1444  std::atomic<size_t> row_idx{0};
1445  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1446  int thread_id) {
1447  for (;;) {
1448  auto row = row_idx.fetch_add(1);
1449 
1450  if (row >= dest_array_ids.size()) {
1451  return;
1452  }
1453  const auto& source_ids = source_array_ids[row];
1454  auto& dest_ids = dest_array_ids[row];
1455  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1456  }
1457  };
1458 
1459  const int num_worker_threads = std::thread::hardware_concurrency();
1460 
1461  if (source_array_ids.size() / num_worker_threads > 10) {
1462  std::vector<std::future<void>> worker_threads;
1463  for (int i = 0; i < num_worker_threads; ++i) {
1464  worker_threads.push_back(std::async(std::launch::async, processor, i));
1465  }
1466 
1467  for (auto& child : worker_threads) {
1468  child.wait();
1469  }
1470  for (auto& child : worker_threads) {
1471  child.get();
1472  }
1473  } else {
1474  processor(0);
1475  }
1476 }
1477 
1478 void translate_string_ids(std::vector<int32_t>& dest_ids,
1479  const LeafHostInfo& dict_server_host,
1480  const DictRef dest_dict_ref,
1481  const std::vector<int32_t>& source_ids,
1482  const DictRef source_dict_ref,
1483  const int32_t dest_generation) {
1484  DictRef temp_dict_ref(-1, -1);
1485  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1486  string_client.translate_string_ids(
1487  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1488 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
uint32_t rk_hash(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
std::string getStringChecked(const int string_id) const noexcept
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:210
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:209
int32_t getIdOfString(const std::string &str) const
int32_t getOrAddImpl(const std::string &str) noexcept
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
void increaseCapacity() noexcept
void start()
Definition: Asio.cpp:33
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
void log_encoding_error(std::string_view str)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
CHECK(cgen_state)
DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:40
DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
#define CHECK_NE(x, y)
Definition: Logger.h:206
mapd_shared_mutex rw_mutex_
DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
const int32_t groups_buffer_size return nullptr
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
std::vector< int32_t > string_id_hash_table_
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void * checked_mmap(const int fd, const size_t sz)
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
#define CHECK_LE(x, y)
Definition: Logger.h:208
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
std::string getString(int32_t string_id) const
std::unique_ptr< StringDictionaryClient > client_no_timeout_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
ThreadId thread_id()
Definition: Logger.cpp:715
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:82
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
Definition: Execute.cpp:71
void invalidateInvertedIndex() noexcept
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
const uint64_t round_up_p2(const uint64_t num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:25
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< int32_t > sorted_cache
void appendToStorage(String str) noexcept
~StringDictionary() noexcept
std::shared_ptr< const std::vector< std::string > > copyStrings() const
std::vector< uint32_t > rk_hashes_