OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "StringDictionary.h"
18 #include "../Shared/sqltypes.h"
19 #include "../Utils/Regexp.h"
20 #include "../Utils/StringLike.h"
21 #include "LeafHostInfo.h"
22 #include "Shared/Logger.h"
23 #include "Shared/thread_count.h"
24 #include "StringDictionaryClient.h"
25 
26 #include <sys/fcntl.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 
30 #include <boost/filesystem/operations.hpp>
31 #include <boost/filesystem/path.hpp>
32 #include <boost/sort/spreadsort/string_sort.hpp>
33 
34 #include <tbb/parallel_for.h>
35 
36 #include <future>
37 #include <iostream>
38 #include <string_view>
39 #include <thread>
40 
41 namespace {
42 const int SYSTEM_PAGE_SIZE = getpagesize();
43 
44 size_t file_size(const int fd) {
45  struct stat buf;
46  int err = fstat(fd, &buf);
47  CHECK_EQ(0, err);
48  return buf.st_size;
49 }
50 
51 int checked_open(const char* path, const bool recover) {
52  auto fd = open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
53  if (fd > 0) {
54  return fd;
55  }
56  auto err = std::string("Dictionary path ") + std::string(path) +
57  std::string(" does not exist.");
58  LOG(ERROR) << err;
59  throw DictPayloadUnavailable(err);
60 }
61 
62 void* checked_mmap(const int fd, const size_t sz) {
63  auto ptr = mmap(nullptr, sz, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
64  CHECK(ptr != reinterpret_cast<void*>(-1));
65 #ifdef __linux__
66 #ifdef MADV_HUGEPAGE
67  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED | MADV_HUGEPAGE);
68 #else
69  madvise(ptr, sz, MADV_RANDOM | MADV_WILLNEED);
70 #endif
71 #endif
72  return ptr;
73 }
74 void checked_munmap(void* addr, size_t length) {
75  CHECK_EQ(0, munmap(addr, length));
76 }
77 
78 const uint64_t round_up_p2(const uint64_t num) {
79  uint64_t in = num;
80  in--;
81  in |= in >> 1;
82  in |= in >> 2;
83  in |= in >> 4;
84  in |= in >> 8;
85  in |= in >> 16;
86  in++;
87  // TODO MAT deal with case where filesize has been increased but reality is
88  // we are constrained to 2^31.
89  // In that situation this calculation will wrap to zero
90  if (in == 0 || (in > (UINT32_MAX))) {
91  in = UINT32_MAX;
92  }
93  return in;
94 }
95 
96 uint32_t rk_hash(const std::string_view& str) {
97  uint32_t str_hash = 1;
98  // rely on fact that unsigned overflow is defined and wraps
99  for (size_t i = 0; i < str.size(); ++i) {
100  str_hash = str_hash * 997 + str[i];
101  }
102  return str_hash;
103 }
104 } // namespace
105 
107 constexpr int32_t StringDictionary::INVALID_STR_ID;
108 constexpr size_t StringDictionary::MAX_STRLEN;
109 constexpr size_t StringDictionary::MAX_STRCOUNT;
110 
111 StringDictionary::StringDictionary(const std::string& folder,
112  const bool isTemp,
113  const bool recover,
114  const bool materializeHashes,
115  size_t initial_capacity)
116  : str_count_(0)
117  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
118  , rk_hashes_(initial_capacity)
119  , isTemp_(isTemp)
120  , materialize_hashes_(materializeHashes)
121  , payload_fd_(-1)
122  , offset_fd_(-1)
123  , offset_map_(nullptr)
124  , payload_map_(nullptr)
125  , offset_file_size_(0)
126  , payload_file_size_(0)
127  , payload_file_off_(0)
128  , strings_cache_(nullptr) {
129  if (!isTemp && folder.empty()) {
130  return;
131  }
132 
133  // initial capacity must be a power of two for efficient bucket computation
134  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
135  if (!isTemp_) {
136  boost::filesystem::path storage_path(folder);
137  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
138  const auto payload_path =
139  (storage_path / boost::filesystem::path("DictPayload")).string();
140  payload_fd_ = checked_open(payload_path.c_str(), recover);
141  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
143  offset_file_size_ = file_size(offset_fd_);
144  }
145 
146  if (payload_file_size_ == 0) {
148  }
149  if (offset_file_size_ == 0) {
151  }
152  if (!isTemp_) { // we never mmap or recover temp dictionaries
153  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
154  offset_map_ =
156  if (recover) {
157  const size_t bytes = file_size(offset_fd_);
158  if (bytes % sizeof(StringIdxEntry) != 0) {
159  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
160  }
161  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
162  // at this point we know the size of the StringDict we need to load
163  // so lets reallocate the vector to the correct size
164  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
165  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
166  string_id_hash_table_.swap(new_str_ids);
167  if (materialize_hashes_) {
168  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
169  rk_hashes_.swap(new_rk_hashes);
170  }
171  unsigned string_id = 0;
172  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
173 
174  uint32_t thread_inits = 0;
175  const auto thread_count = std::thread::hardware_concurrency();
176  const uint32_t items_per_thread = std::max<uint32_t>(
177  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
178  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
179  dictionary_futures;
180  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
181  dictionary_futures.emplace_back(std::async(
182  std::launch::async, [string_id, str_count, items_per_thread, this] {
183  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
184  for (uint32_t curr_id = string_id;
185  curr_id < string_id + items_per_thread && curr_id < str_count;
186  curr_id++) {
187  const auto recovered = getStringFromStorage(curr_id);
188  if (recovered.canary) {
189  // hit the canary, recovery finished
190  break;
191  } else {
192  std::string temp(recovered.c_str_ptr, recovered.size);
193  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
194  }
195  }
196  return hashVec;
197  }));
198  thread_inits++;
199  if (thread_inits % thread_count == 0) {
200  processDictionaryFutures(dictionary_futures);
201  }
202  }
203  // gather last few threads
204  if (dictionary_futures.size() != 0) {
205  processDictionaryFutures(dictionary_futures);
206  }
207  }
208  }
209 }
210 
212  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>&
213  dictionary_futures) {
214  for (auto& dictionary_future : dictionary_futures) {
215  dictionary_future.wait();
216  auto hashVec = dictionary_future.get();
217  for (auto& hash : hashVec) {
218  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
219  payload_file_off_ += hash.second;
220  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
221  if (materialize_hashes_) {
222  rk_hashes_[str_count_] = hash.first;
223  }
224  ++str_count_;
225  }
226  }
227  dictionary_futures.clear();
228 }
229 
231  : strings_cache_(nullptr)
232  , client_(new StringDictionaryClient(host, dict_ref, true))
233  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
234 
236  free(CANARY_BUFFER);
237  if (client_) {
238  return;
239  }
240  if (payload_map_) {
241  if (!isTemp_) {
245  CHECK_GE(payload_fd_, 0);
247  CHECK_GE(offset_fd_, 0);
248  close(offset_fd_);
249  } else {
251  free(payload_map_);
252  free(offset_map_);
253  }
254  }
255 }
256 
257 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
258  if (client_) {
259  std::vector<int32_t> string_ids;
260  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
261  CHECK_EQ(size_t(1), string_ids.size());
262  return string_ids.front();
263  }
264  return getOrAddImpl(str);
265 }
266 
267 namespace {
268 
269 template <class T>
270 void log_encoding_error(std::string_view str) {
271  LOG(ERROR) << "Could not encode string: " << str
272  << ", the encoded value doesn't fit in " << sizeof(T) * 8
273  << " bits. Will store NULL instead.";
274 }
275 
276 } // namespace
277 
278 template <class String>
280  const std::vector<std::vector<String>>& string_array_vec,
281  std::vector<std::vector<int32_t>>& ids_array_vec) {
282  ids_array_vec.resize(string_array_vec.size());
283  for (size_t i = 0; i < string_array_vec.size(); i++) {
284  auto& strings = string_array_vec[i];
285  auto& ids = ids_array_vec[i];
286  ids.resize(strings.size());
287  getOrAddBulk(strings, &ids[0]);
288  }
289 }
290 
292  const std::vector<std::vector<std::string>>& string_array_vec,
293  std::vector<std::vector<int32_t>>& ids_array_vec);
294 
300 template <class String>
301 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
302  std::vector<uint32_t>& hashes) const noexcept {
303  CHECK_EQ(string_vec.size(), hashes.size());
304 
305  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
306  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
307  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
308  if (string_vec[curr_id].empty()) {
309  continue;
310  }
311  hashes[curr_id] = rk_hash(string_vec[curr_id]);
312  }
313  });
314 }
315 
316 template <class T, class String>
317 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
318  T* output_string_ids) {
320  getOrAddBulkParallel(input_strings, output_string_ids);
321  return;
322  }
323  // Single-thread path.
324  if (client_no_timeout_) {
325  getOrAddBulkRemote(input_strings, output_string_ids);
326  return;
327  }
328  size_t out_idx{0};
329  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
330 
331  for (const auto& str : input_strings) {
332  if (str.empty()) {
333  output_string_ids[out_idx++] = inline_int_null_value<T>();
334  continue;
335  }
336  CHECK(str.size() <= MAX_STRLEN);
337  uint32_t bucket;
338  const uint32_t hash = rk_hash(str);
339  bucket = computeBucket(hash, str, string_id_hash_table_);
340  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
341  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
342  continue;
343  }
344  // need to add record to dictionary
345  // check there is room
346  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
347  log_encoding_error<T>(str);
348  output_string_ids[out_idx++] = inline_int_null_value<T>();
349  continue;
350  }
351  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
353  << "Maximum number (" << str_count_
354  << ") of Dictionary encoded Strings reached for this column, offset path "
355  "for column is "
356  << offsets_path_;
357  if (fillRateIsHigh(str_count_)) {
358  // resize when more than 50% is full
360  bucket = computeBucket(hash, str, string_id_hash_table_);
361  }
362  appendToStorage(str);
363 
364  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
365  if (materialize_hashes_) {
366  rk_hashes_[str_count_] = hash;
367  }
368  ++str_count_;
369  }
370  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
371  }
373 }
374 
375 template <class T, class String>
376 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
377  T* output_string_ids) {
378  if (client_no_timeout_) {
379  getOrAddBulkRemote(input_strings, output_string_ids);
380  return;
381  }
382  // Run rk_hash on the input strings up front, and in parallel,
383  // as the string hashing does not need to be behind the subsequent write_lock
384  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
385  hashStrings(input_strings, input_strings_rk_hashes);
386 
387  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
388  size_t shadow_str_count =
389  str_count_; // Need to shadow str_count_ now with bulk add methods
390  const size_t storage_high_water_mark = shadow_str_count;
391  std::vector<size_t> string_memory_ids;
392  size_t sum_new_string_lengths = 0;
393  string_memory_ids.reserve(input_strings.size());
394  size_t input_string_idx{0};
395  for (const auto& input_string : input_strings) {
396  // Currently we make empty strings null
397  if (input_string.empty()) {
398  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
399  continue;
400  }
401  // TODO: Recover gracefully if an input string is too long
402  CHECK(input_string.size() <= MAX_STRLEN);
403 
404  if (fillRateIsHigh(shadow_str_count)) {
405  // resize when more than 50% is full
406  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
407  input_strings,
408  string_memory_ids,
409  input_strings_rk_hashes);
410  }
411  // Get the rk_hash for this input_string
412  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
413 
414  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
415  input_string,
417  storage_high_water_mark,
418  input_strings,
419  string_memory_ids);
420 
421  // If the hash bucket is not empty, that is our string id
422  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
423  // bucket string are equal)
424  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
425  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
426  continue;
427  }
428  // Did not find string, so need to add record to dictionary
429  // First check there is room
430  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
431  log_encoding_error<T>(input_string);
432  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
433  continue;
434  }
435  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
436  << "Maximum number (" << shadow_str_count
437  << ") of Dictionary encoded Strings reached for this column, offset path "
438  "for column is "
439  << offsets_path_;
440 
441  string_memory_ids.push_back(input_string_idx);
442  sum_new_string_lengths += input_string.size();
443  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
444  if (materialize_hashes_) {
445  rk_hashes_[shadow_str_count] = input_string_rk_hash;
446  }
447  output_string_ids[input_string_idx++] = shadow_str_count++;
448  }
449  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
450  str_count_ = shadow_str_count;
451 
453 }
454 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
455  uint8_t* encoded_vec);
456 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
457  uint16_t* encoded_vec);
458 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
459  int32_t* encoded_vec);
460 
461 template void StringDictionary::getOrAddBulk(
462  const std::vector<std::string_view>& string_vec,
463  uint8_t* encoded_vec);
464 template void StringDictionary::getOrAddBulk(
465  const std::vector<std::string_view>& string_vec,
466  uint16_t* encoded_vec);
467 template void StringDictionary::getOrAddBulk(
468  const std::vector<std::string_view>& string_vec,
469  int32_t* encoded_vec);
470 
471 template <class T, class String>
472 void StringDictionary::getOrAddBulkRemote(const std::vector<String>& string_vec,
473  T* encoded_vec) {
475  std::vector<int32_t> string_ids;
476  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
477  size_t out_idx{0};
478  for (size_t i = 0; i < string_ids.size(); ++i) {
479  const auto string_id = string_ids[i];
480  const bool invalid = string_id > max_valid_int_value<T>();
481  if (invalid || string_id == inline_int_null_value<int32_t>()) {
482  if (invalid) {
483  log_encoding_error<T>(string_vec[i]);
484  }
485  encoded_vec[out_idx++] = inline_int_null_value<T>();
486  continue;
487  }
488  encoded_vec[out_idx++] = string_id;
489  }
490 }
491 
493  const std::vector<std::string>& string_vec,
494  uint8_t* encoded_vec);
496  const std::vector<std::string>& string_vec,
497  uint16_t* encoded_vec);
499  const std::vector<std::string>& string_vec,
500  int32_t* encoded_vec);
501 
503  const std::vector<std::string_view>& string_vec,
504  uint8_t* encoded_vec);
506  const std::vector<std::string_view>& string_vec,
507  uint16_t* encoded_vec);
509  const std::vector<std::string_view>& string_vec,
510  int32_t* encoded_vec);
511 
512 int32_t StringDictionary::getIdOfString(const std::string& str) const {
513  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
514  if (client_) {
515  return client_->get(str);
516  }
517  return getUnlocked(str);
518 }
519 
520 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
521  const uint32_t hash = rk_hash(str);
522  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
523  return str_id;
524 }
525 
526 std::string StringDictionary::getString(int32_t string_id) const {
527  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
528  if (client_) {
529  std::string ret;
530  client_->get_string(ret, string_id);
531  return ret;
532  }
533  return getStringUnlocked(string_id);
534 }
535 
536 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
537  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
538  return getStringChecked(string_id);
539 }
540 
541 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
542  noexcept {
543  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
544  CHECK(!client_);
545  CHECK_LE(0, string_id);
546  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
547  return getStringBytesChecked(string_id);
548 }
549 
551  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
552  if (client_) {
553  return client_->storage_entry_count();
554  }
555  return str_count_;
556 }
557 
558 namespace {
559 
560 bool is_like(const std::string& str,
561  const std::string& pattern,
562  const bool icase,
563  const bool is_simple,
564  const char escape) {
565  return icase
566  ? (is_simple ? string_ilike_simple(
567  str.c_str(), str.size(), pattern.c_str(), pattern.size())
568  : string_ilike(str.c_str(),
569  str.size(),
570  pattern.c_str(),
571  pattern.size(),
572  escape))
573  : (is_simple ? string_like_simple(
574  str.c_str(), str.size(), pattern.c_str(), pattern.size())
575  : string_like(str.c_str(),
576  str.size(),
577  pattern.c_str(),
578  pattern.size(),
579  escape));
580 }
581 
582 } // namespace
583 
584 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
585  const bool icase,
586  const bool is_simple,
587  const char escape,
588  const size_t generation) const {
589  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
590  if (client_) {
591  return client_->get_like(pattern, icase, is_simple, escape, generation);
592  }
593  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
594  const auto it = like_cache_.find(cache_key);
595  if (it != like_cache_.end()) {
596  return it->second;
597  }
598  std::vector<int32_t> result;
599  std::vector<std::thread> workers;
600  int worker_count = cpu_threads();
601  CHECK_GT(worker_count, 0);
602  std::vector<std::vector<int32_t>> worker_results(worker_count);
603  CHECK_LE(generation, str_count_);
604  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
605  workers.emplace_back([&worker_results,
606  &pattern,
607  generation,
608  icase,
609  is_simple,
610  escape,
611  worker_idx,
612  worker_count,
613  this]() {
614  for (size_t string_id = worker_idx; string_id < generation;
615  string_id += worker_count) {
616  const auto str = getStringUnlocked(string_id);
617  if (is_like(str, pattern, icase, is_simple, escape)) {
618  worker_results[worker_idx].push_back(string_id);
619  }
620  }
621  });
622  }
623  for (auto& worker : workers) {
624  worker.join();
625  }
626  for (const auto& worker_result : worker_results) {
627  result.insert(result.end(), worker_result.begin(), worker_result.end());
628  }
629  // place result into cache for reuse if similar query
630  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
631 
632  CHECK(it_ok.second);
633 
634  return result;
635 }
636 
637 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
638  std::string comp_operator,
639  size_t generation) {
640  std::vector<int32_t> result;
641  auto eq_id_itr = equal_cache_.find(pattern);
642  int32_t eq_id = MAX_STRLEN + 1;
643  int32_t cur_size = str_count_;
644  if (eq_id_itr != equal_cache_.end()) {
645  auto eq_id = eq_id_itr->second;
646  if (comp_operator == "=") {
647  result.push_back(eq_id);
648  } else {
649  for (int32_t idx = 0; idx <= cur_size; idx++) {
650  if (idx == eq_id) {
651  continue;
652  }
653  result.push_back(idx);
654  }
655  }
656  } else {
657  std::vector<std::thread> workers;
658  int worker_count = cpu_threads();
659  CHECK_GT(worker_count, 0);
660  std::vector<std::vector<int32_t>> worker_results(worker_count);
661  CHECK_LE(generation, str_count_);
662  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
663  workers.emplace_back(
664  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
665  for (size_t string_id = worker_idx; string_id < generation;
666  string_id += worker_count) {
667  const auto str = getStringUnlocked(string_id);
668  if (str == pattern) {
669  worker_results[worker_idx].push_back(string_id);
670  }
671  }
672  });
673  }
674  for (auto& worker : workers) {
675  worker.join();
676  }
677  for (const auto& worker_result : worker_results) {
678  result.insert(result.end(), worker_result.begin(), worker_result.end());
679  }
680  if (result.size() > 0) {
681  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
682  CHECK(it_ok.second);
683  eq_id = result[0];
684  }
685  if (comp_operator == "<>") {
686  for (int32_t idx = 0; idx <= cur_size; idx++) {
687  if (idx == eq_id) {
688  continue;
689  }
690  result.push_back(idx);
691  }
692  }
693  }
694  return result;
695 }
696 
697 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
698  const std::string& comp_operator,
699  const size_t generation) {
700  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
701  if (client_) {
702  return client_->get_compare(pattern, comp_operator, generation);
703  }
704  std::vector<int32_t> ret;
705  if (str_count_ == 0) {
706  return ret;
707  }
708  if (sorted_cache.size() < str_count_) {
709  if (comp_operator == "=" || comp_operator == "<>") {
710  return getEquals(pattern, comp_operator, generation);
711  }
712 
714  }
715  auto cache_index = compare_cache_.get(pattern);
716 
717  if (!cache_index) {
718  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
719  const auto cache_itr = std::lower_bound(
720  sorted_cache.begin(),
721  sorted_cache.end(),
722  pattern,
723  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
724  auto a_str = this->getStringFromStorage(a);
725  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
726  });
727 
728  if (cache_itr == sorted_cache.end()) {
729  cache_index->index = sorted_cache.size() - 1;
730  cache_index->diff = 1;
731  } else {
732  const auto cache_str = getStringFromStorage(*cache_itr);
733  if (!string_eq(
734  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
735  cache_index->index = cache_itr - sorted_cache.begin() - 1;
736  cache_index->diff = 1;
737  } else {
738  cache_index->index = cache_itr - sorted_cache.begin();
739  cache_index->diff = 0;
740  }
741  }
742 
743  compare_cache_.put(pattern, cache_index);
744  }
745 
746  // since we have a cache in form of vector of ints which is sorted according to
747  // corresponding strings in the dictionary all we need is the index of the element
748  // which equal to the pattern that we are trying to match or the index of “biggest”
749  // element smaller than the pattern, to perform all the comparison operators over
750  // string. The search function guarantees we have such index so now it is just the
751  // matter to include all the elements in the result vector.
752 
753  // For < operator if the index that we have points to the element which is equal to
754  // the pattern that we are searching for we simply get all the elements less than the
755  // index. If the element pointed by the index is not equal to the pattern we are
756  // comparing with we also need to include that index in result vector, except when the
757  // index points to 0 and the pattern is lesser than the smallest value in the string
758  // dictionary.
759 
760  if (comp_operator == "<") {
761  size_t idx = cache_index->index;
762  if (cache_index->diff) {
763  idx = cache_index->index + 1;
764  if (cache_index->index == 0 && cache_index->diff > 0) {
765  idx = cache_index->index;
766  }
767  }
768  for (size_t i = 0; i < idx; i++) {
769  ret.push_back(sorted_cache[i]);
770  }
771 
772  // For <= operator if the index that we have points to the element which is equal to
773  // the pattern that we are searching for we want to include the element pointed by
774  // the index in the result set. If the element pointed by the index is not equal to
775  // the pattern we are comparing with we just want to include all the ids with index
776  // less than the index that is cached, except when pattern that we are searching for
777  // is smaller than the smallest string in the dictionary.
778 
779  } else if (comp_operator == "<=") {
780  size_t idx = cache_index->index + 1;
781  if (cache_index == 0 && cache_index->diff > 0) {
782  idx = cache_index->index;
783  }
784  for (size_t i = 0; i < idx; i++) {
785  ret.push_back(sorted_cache[i]);
786  }
787 
788  // For > operator we want to get all the elements with index greater than the index
789  // that we have except, when the pattern we are searching for is lesser than the
790  // smallest string in the dictionary we also want to include the id of the index
791  // that we have.
792 
793  } else if (comp_operator == ">") {
794  size_t idx = cache_index->index + 1;
795  if (cache_index->index == 0 && cache_index->diff > 0) {
796  idx = cache_index->index;
797  }
798  for (size_t i = idx; i < sorted_cache.size(); i++) {
799  ret.push_back(sorted_cache[i]);
800  }
801 
802  // For >= operator when the indexed element that we have points to element which is
803  // equal to the pattern we are searching for we want to include that in the result
804  // vector. If the index that we have does not point to the string which is equal to
805  // the pattern we are searching we don’t want to include that id into the result
806  // vector except when the index is 0.
807 
808  } else if (comp_operator == ">=") {
809  size_t idx = cache_index->index;
810  if (cache_index->diff) {
811  idx = cache_index->index + 1;
812  if (cache_index->index == 0 && cache_index->diff > 0) {
813  idx = cache_index->index;
814  }
815  }
816  for (size_t i = idx; i < sorted_cache.size(); i++) {
817  ret.push_back(sorted_cache[i]);
818  }
819  } else if (comp_operator == "=") {
820  if (!cache_index->diff) {
821  ret.push_back(sorted_cache[cache_index->index]);
822  }
823 
824  // For <> operator it is simple matter of not including id of string which is equal
825  // to pattern we are searching for.
826  } else if (comp_operator == "<>") {
827  if (!cache_index->diff) {
828  size_t idx = cache_index->index;
829  for (size_t i = 0; i < idx; i++) {
830  ret.push_back(sorted_cache[i]);
831  }
832  ++idx;
833  for (size_t i = idx; i < sorted_cache.size(); i++) {
834  ret.push_back(sorted_cache[i]);
835  }
836  } else {
837  for (size_t i = 0; i < sorted_cache.size(); i++) {
838  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
839  }
840  }
841 
842  } else {
843  std::runtime_error("Unsupported string comparison operator");
844  }
845  return ret;
846 }
847 
848 namespace {
849 
850 bool is_regexp_like(const std::string& str,
851  const std::string& pattern,
852  const char escape) {
853  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
854 }
855 
856 } // namespace
857 
858 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
859  const char escape,
860  const size_t generation) const {
861  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
862  if (client_) {
863  return client_->get_regexp_like(pattern, escape, generation);
864  }
865  const auto cache_key = std::make_pair(pattern, escape);
866  const auto it = regex_cache_.find(cache_key);
867  if (it != regex_cache_.end()) {
868  return it->second;
869  }
870  std::vector<int32_t> result;
871  std::vector<std::thread> workers;
872  int worker_count = cpu_threads();
873  CHECK_GT(worker_count, 0);
874  std::vector<std::vector<int32_t>> worker_results(worker_count);
875  CHECK_LE(generation, str_count_);
876  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
877  workers.emplace_back([&worker_results,
878  &pattern,
879  generation,
880  escape,
881  worker_idx,
882  worker_count,
883  this]() {
884  for (size_t string_id = worker_idx; string_id < generation;
885  string_id += worker_count) {
886  const auto str = getStringUnlocked(string_id);
887  if (is_regexp_like(str, pattern, escape)) {
888  worker_results[worker_idx].push_back(string_id);
889  }
890  }
891  });
892  }
893  for (auto& worker : workers) {
894  worker.join();
895  }
896  for (const auto& worker_result : worker_results) {
897  result.insert(result.end(), worker_result.begin(), worker_result.end());
898  }
899  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
900  CHECK(it_ok.second);
901 
902  return result;
903 }
904 
905 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
906  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
907  if (client_) {
908  // TODO(miyu): support remote string dictionary
909  throw std::runtime_error(
910  "copying dictionaries from remote server is not supported yet.");
911  }
912 
913  if (strings_cache_) {
914  return strings_cache_;
915  }
916 
917  strings_cache_ = std::make_shared<std::vector<std::string>>();
918  strings_cache_->reserve(str_count_);
919  const bool multithreaded = str_count_ > 10000;
920  const auto worker_count =
921  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
922  CHECK_GT(worker_count, 0UL);
923  std::vector<std::vector<std::string>> worker_results(worker_count);
924  auto copy = [this](std::vector<std::string>& str_list,
925  const size_t start_id,
926  const size_t end_id) {
927  CHECK_LE(start_id, end_id);
928  str_list.reserve(end_id - start_id);
929  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
930  str_list.push_back(getStringUnlocked(string_id));
931  }
932  };
933  if (multithreaded) {
934  std::vector<std::future<void>> workers;
935  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
936  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
937  worker_idx < worker_count && start < str_count_;
938  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
939  workers.push_back(std::async(
940  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
941  }
942  for (auto& worker : workers) {
943  worker.get();
944  }
945  } else {
946  CHECK_EQ(worker_results.size(), size_t(1));
947  copy(worker_results[0], 0, str_count_);
948  }
949 
950  for (const auto& worker_result : worker_results) {
951  strings_cache_->insert(
952  strings_cache_->end(), worker_result.begin(), worker_result.end());
953  }
954  return strings_cache_;
955 }
956 
957 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
958  return string_id_hash_table_.size() <= num_strings * 2;
959 }
960 
962  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
963 
964  if (materialize_hashes_) {
965  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
967  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
968  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
969  new_str_ids[bucket] = string_id_hash_table_[i];
970  }
971  }
972  rk_hashes_.resize(rk_hashes_.size() * 2);
973  } else {
974  for (size_t i = 0; i < str_count_; ++i) {
975  const auto str = getStringChecked(i);
976  const uint32_t hash = rk_hash(str);
977  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
978  new_str_ids[bucket] = i;
979  }
980  }
981  string_id_hash_table_.swap(new_str_ids);
982 }
983 
984 template <class String>
986  const size_t storage_high_water_mark,
987  const std::vector<String>& input_strings,
988  const std::vector<size_t>& string_memory_ids,
989  const std::vector<uint32_t>& input_strings_rk_hashes) noexcept {
990  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
991  if (materialize_hashes_) {
992  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
993  if (string_id_hash_table_[i] != INVALID_STR_ID) {
994  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
995  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
996  new_str_ids[bucket] = string_id_hash_table_[i];
997  }
998  }
999  rk_hashes_.resize(rk_hashes_.size() * 2);
1000  } else {
1001  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1002  const auto storage_string = getStringChecked(storage_idx);
1003  const uint32_t hash = rk_hash(storage_string);
1004  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1005  new_str_ids[bucket] = storage_idx;
1006  }
1007  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1008  size_t string_memory_id = string_memory_ids[memory_idx];
1009  uint32_t bucket = computeUniqueBucketWithHash(
1010  input_strings_rk_hashes[string_memory_id], new_str_ids);
1011  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1012  }
1013  }
1014  string_id_hash_table_.swap(new_str_ids);
1015 }
1016 
1017 int32_t StringDictionary::getOrAddImpl(const std::string& str) noexcept {
1018  // @TODO(wei) treat empty string as NULL for now
1019  if (str.size() == 0) {
1020  return inline_int_null_value<int32_t>();
1021  }
1022  CHECK(str.size() <= MAX_STRLEN);
1023  uint32_t bucket;
1024  const uint32_t hash = rk_hash(str);
1025  {
1026  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1027  bucket = computeBucket(hash, str, string_id_hash_table_);
1028  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1029  return string_id_hash_table_[bucket];
1030  }
1031  }
1032  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1033  // need to recalculate the bucket in case it changed before
1034  // we got the lock
1035  bucket = computeBucket(hash, str, string_id_hash_table_);
1036  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1037  CHECK_LT(str_count_, MAX_STRCOUNT)
1038  << "Maximum number (" << str_count_
1039  << ") of Dictionary encoded Strings reached for this column, offset path "
1040  "for column is "
1041  << offsets_path_;
1042  if (fillRateIsHigh(str_count_)) {
1043  // resize when more than 50% is full
1044  increaseCapacity();
1045  bucket = computeBucket(hash, str, string_id_hash_table_);
1046  }
1047  appendToStorage(str);
1048  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1049  if (materialize_hashes_) {
1050  rk_hashes_[str_count_] = hash;
1051  }
1052  ++str_count_;
1053  invalidateInvertedIndex();
1054  }
1055  return string_id_hash_table_[bucket];
1056 }
1057 
1058 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1059  const auto str_canary = getStringFromStorage(string_id);
1060  CHECK(!str_canary.canary);
1061  return std::string(str_canary.c_str_ptr, str_canary.size);
1062 }
1063 
1065  const int string_id) const noexcept {
1066  const auto str_canary = getStringFromStorage(string_id);
1067  CHECK(!str_canary.canary);
1068  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1069 }
1070 
1071 template <class String>
1072 uint32_t StringDictionary::computeBucket(const uint32_t hash,
1073  const String& str,
1074  const std::vector<int32_t>& data) const
1075  noexcept {
1076  auto bucket = hash & (data.size() - 1);
1077  while (true) {
1078  const int32_t candidate_string_id = data[bucket];
1079  if (candidate_string_id ==
1080  INVALID_STR_ID) { // In this case it means the slot is available for use
1081  break;
1082  }
1083  if (!materialize_hashes_ ||
1084  (materialize_hashes_ && hash == rk_hashes_[candidate_string_id])) {
1085  const auto old_str = getStringFromStorageFast(candidate_string_id);
1086  if (str.size() == old_str.size() &&
1087  !memcmp(str.data(), old_str.data(), str.size())) {
1088  // found the string
1089  break;
1090  }
1091  }
1092  // wrap around
1093  if (++bucket == data.size()) {
1094  bucket = 0;
1095  }
1096  }
1097  return bucket;
1098 }
1099 
1100 template <class String>
1102  const uint32_t input_string_rk_hash,
1103  const String& input_string,
1104  const std::vector<int32_t>& string_id_hash_table,
1105  const size_t storage_high_water_mark,
1106  const std::vector<String>& input_strings,
1107  const std::vector<size_t>& string_memory_ids) const noexcept {
1108  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1109  while (true) {
1110  const int32_t candidate_string_id = string_id_hash_table[bucket];
1111  if (candidate_string_id ==
1112  INVALID_STR_ID) { // In this case it means the slot is available for use
1113  break;
1114  }
1115  if (!materialize_hashes_ ||
1116  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1117  if (candidate_string_id > 0 &&
1118  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1119  // The candidate string is not in storage yet but in our string_memory_ids temp
1120  // buffer
1121  size_t memory_offset =
1122  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1123  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1124  if (input_string.size() == candidate_string.size() &&
1125  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1126  // found the string in the temp memory buffer
1127  break;
1128  }
1129  } else {
1130  // The candidate string is in storage, need to fetch it for comparison
1131  const auto candidate_storage_string =
1132  getStringFromStorageFast(candidate_string_id);
1133  if (input_string.size() == candidate_storage_string.size() &&
1134  !memcmp(input_string.data(),
1135  candidate_storage_string.data(),
1136  input_string.size())) {
1139  // found the string in storage
1140  break;
1141  }
1142  }
1143  }
1144  if (++bucket == string_id_hash_table.size()) {
1145  bucket = 0;
1146  }
1147  }
1148  return bucket;
1149 }
1150 
1152  const uint32_t hash,
1153  const std::vector<int32_t>& data) const noexcept {
1154  auto bucket = hash & (data.size() - 1);
1155  while (true) {
1156  if (data[bucket] ==
1157  INVALID_STR_ID) { // In this case it means the slot is available for use
1158  break;
1159  }
1160  // wrap around
1161  if (++bucket == data.size()) {
1162  bucket = 0;
1163  }
1164  }
1165  return bucket;
1166 }
1167 
1169  const size_t write_length) {
1170  if (payload_file_off_ + write_length > payload_file_size_) {
1171  const size_t min_capacity_needed =
1172  write_length - (payload_file_size_ - payload_file_off_);
1173  if (!isTemp_) {
1174  CHECK_GE(payload_fd_, 0);
1176  addPayloadCapacity(min_capacity_needed);
1177  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1178  payload_map_ =
1179  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
1180  } else {
1181  addPayloadCapacity(min_capacity_needed);
1182  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1183  }
1184  }
1185 }
1186 
1188  const size_t write_length) {
1189  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1190  if (offset_file_off + write_length >= offset_file_size_) {
1191  const size_t min_capacity_needed =
1192  write_length - (offset_file_size_ - offset_file_off);
1193  if (!isTemp_) {
1194  CHECK_GE(offset_fd_, 0);
1196  addOffsetCapacity(min_capacity_needed);
1197  CHECK(offset_file_off + write_length <= offset_file_size_);
1198  offset_map_ =
1200  } else {
1201  addOffsetCapacity(min_capacity_needed);
1202  CHECK(offset_file_off + write_length <= offset_file_size_);
1203  }
1204  }
1205 }
1206 
1207 template <class String>
1208 void StringDictionary::appendToStorage(String str) noexcept {
1209  // write the payload
1210  checkAndConditionallyIncreasePayloadCapacity(str.size());
1211  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1212 
1213  // write the offset and length
1214  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1215  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1216 
1217  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1218  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1219 }
1220 
1221 template <class String>
1223  const std::vector<String>& input_strings,
1224  const std::vector<size_t>& string_memory_ids,
1225  const size_t sum_new_strings_lengths) noexcept {
1226  const size_t num_strings = string_memory_ids.size();
1227 
1228  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1229  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1230 
1231  for (size_t i = 0; i < num_strings; ++i) {
1232  const size_t string_idx = string_memory_ids[i];
1233  const String str = input_strings[string_idx];
1234  const size_t str_size(str.size());
1235  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1236  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1237  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1238  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1239  }
1240 }
1241 
1242 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1243  noexcept {
1244  const StringIdxEntry* str_meta = offset_map_ + string_id;
1245  return {payload_map_ + str_meta->off, str_meta->size};
1246 }
1247 
1249  const int string_id) const noexcept {
1250  if (!isTemp_) {
1251  CHECK_GE(payload_fd_, 0);
1252  CHECK_GE(offset_fd_, 0);
1253  }
1254  CHECK_GE(string_id, 0);
1255  const StringIdxEntry* str_meta = offset_map_ + string_id;
1256  if (str_meta->size == 0xffff) {
1257  // hit the canary
1258  return {nullptr, 0, true};
1259  }
1260  return {payload_map_ + str_meta->off, str_meta->size, false};
1261 }
1262 
1263 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1264  if (!isTemp_) {
1265  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1266  } else {
1267  payload_map_ = static_cast<char*>(
1268  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1269  }
1270 }
1271 
1272 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1273  if (!isTemp_) {
1274  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1275  } else {
1276  offset_map_ = static_cast<StringIdxEntry*>(
1277  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1278  }
1279 }
1280 
1282  int fd,
1283  const size_t min_capacity_requested) noexcept {
1284  const size_t canary_buff_size_to_add =
1285  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1286  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1287 
1288  if (canary_buffer_size != canary_buff_size_to_add) {
1289  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1290  canary_buffer_size = canary_buff_size_to_add;
1291  }
1292  CHECK(CANARY_BUFFER);
1293  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1294 
1295  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1296  ssize_t write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1297  CHECK(write_return > 0 &&
1298  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1299  return canary_buff_size_to_add;
1300 }
1301 
1303  size_t& mem_size,
1304  const size_t min_capacity_requested) noexcept {
1305  const size_t canary_buff_size_to_add =
1306  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1307  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1308  if (canary_buffer_size != canary_buff_size_to_add) {
1309  CANARY_BUFFER =
1310  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1311  canary_buffer_size = canary_buff_size_to_add;
1312  }
1313  CHECK(CANARY_BUFFER);
1314  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1315  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1316  CHECK(new_addr);
1317  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1318  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1319  mem_size += canary_buff_size_to_add;
1320  return new_addr;
1321 }
1322 
1324  if (!like_cache_.empty()) {
1325  decltype(like_cache_)().swap(like_cache_);
1326  }
1327  if (!regex_cache_.empty()) {
1328  decltype(regex_cache_)().swap(regex_cache_);
1329  }
1330  if (!equal_cache_.empty()) {
1331  decltype(equal_cache_)().swap(equal_cache_);
1332  }
1333  compare_cache_.invalidateInvertedIndex();
1334 }
1335 
1337  if (client_) {
1338  try {
1339  return client_->checkpoint();
1340  } catch (...) {
1341  return false;
1342  }
1343  }
1344  CHECK(!isTemp_);
1345  bool ret = true;
1346  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1347  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1348  ret = ret && (fsync(offset_fd_) == 0);
1349  ret = ret && (fsync(payload_fd_) == 0);
1350  return ret;
1351 }
1352 
1354  // This method is not thread-safe.
1355  const auto cur_cache_size = sorted_cache.size();
1356  std::vector<int32_t> temp_sorted_cache;
1357  for (size_t i = cur_cache_size; i < str_count_; i++) {
1358  temp_sorted_cache.push_back(i);
1359  }
1360  sortCache(temp_sorted_cache);
1361  mergeSortedCache(temp_sorted_cache);
1362 }
1363 
1364 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1365  // This method is not thread-safe.
1366 
1367  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1368  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1369 
1370  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1371  auto a_str = this->getStringFromStorage(a);
1372  auto b_str = this->getStringFromStorage(b);
1373  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1374  });
1375 }
1376 
1377 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1378  // this method is not thread safe
1379  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1380  size_t t_idx = 0, s_idx = 0, idx = 0;
1381  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1382  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1383  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1384  const auto insert_from_temp_cache =
1385  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1386  if (insert_from_temp_cache) {
1387  updated_cache[idx] = temp_sorted_cache[t_idx++];
1388  } else {
1389  updated_cache[idx] = sorted_cache[s_idx++];
1390  }
1391  }
1392  while (t_idx < temp_sorted_cache.size()) {
1393  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1394  }
1395  while (s_idx < sorted_cache.size()) {
1396  updated_cache[idx++] = sorted_cache[s_idx++];
1397  }
1398  sorted_cache.swap(updated_cache);
1399 }
1400 
1402  std::vector<int32_t>& dest_ids,
1403  StringDictionary* dest_dict,
1404  const std::vector<int32_t>& source_ids,
1405  const StringDictionary* source_dict,
1406  const std::map<int32_t, std::string> transient_mapping) {
1407  std::vector<std::string> strings;
1408 
1409  for (const int32_t source_id : source_ids) {
1410  if (source_id == std::numeric_limits<int32_t>::min()) {
1411  strings.emplace_back("");
1412  } else if (source_id < 0) {
1413  if (auto string_itr = transient_mapping.find(source_id);
1414  string_itr != transient_mapping.end()) {
1415  strings.emplace_back(string_itr->second);
1416  } else {
1417  throw std::runtime_error("Unexpected negative source ID");
1418  }
1419  } else {
1420  strings.push_back(source_dict->getString(source_id));
1421  }
1422  }
1423 
1424  dest_ids.resize(strings.size());
1425  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1426 }
1427 
1429  std::vector<std::vector<int32_t>>& dest_array_ids,
1430  StringDictionary* dest_dict,
1431  const std::vector<std::vector<int32_t>>& source_array_ids,
1432  const StringDictionary* source_dict) {
1433  dest_array_ids.resize(source_array_ids.size());
1434 
1435  std::atomic<size_t> row_idx{0};
1436  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1437  int thread_id) {
1438  for (;;) {
1439  auto row = row_idx.fetch_add(1);
1440 
1441  if (row >= dest_array_ids.size()) {
1442  return;
1443  }
1444  const auto& source_ids = source_array_ids[row];
1445  auto& dest_ids = dest_array_ids[row];
1446  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1447  }
1448  };
1449 
1450  const int num_worker_threads = std::thread::hardware_concurrency();
1451 
1452  if (source_array_ids.size() / num_worker_threads > 10) {
1453  std::vector<std::future<void>> worker_threads;
1454  for (int i = 0; i < num_worker_threads; ++i) {
1455  worker_threads.push_back(std::async(std::launch::async, processor, i));
1456  }
1457 
1458  for (auto& child : worker_threads) {
1459  child.wait();
1460  }
1461  for (auto& child : worker_threads) {
1462  child.get();
1463  }
1464  } else {
1465  processor(0);
1466  }
1467 }
1468 
1469 void translate_string_ids(std::vector<int32_t>& dest_ids,
1470  const LeafHostInfo& dict_server_host,
1471  const DictRef dest_dict_ref,
1472  const std::vector<int32_t>& source_ids,
1473  const DictRef source_dict_ref,
1474  const int32_t dest_generation) {
1475  DictRef temp_dict_ref(-1, -1);
1476  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1477  string_client.translate_string_ids(
1478  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1479 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
uint32_t rk_hash(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
std::string getStringChecked(const int string_id) const noexcept
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:210
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:209
int32_t getIdOfString(const std::string &str) const
int32_t getOrAddImpl(const std::string &str) noexcept
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
void increaseCapacity() noexcept
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
void log_encoding_error(std::string_view str)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
CHECK(cgen_state)
DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:40
DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
#define CHECK_NE(x, y)
Definition: Logger.h:206
mapd_shared_mutex rw_mutex_
DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
std::vector< int32_t > string_id_hash_table_
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void * checked_mmap(const int fd, const size_t sz)
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
#define CHECK_LE(x, y)
Definition: Logger.h:208
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
std::string getString(int32_t string_id) const
std::unique_ptr< StringDictionaryClient > client_no_timeout_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
ThreadId thread_id()
Definition: Logger.cpp:715
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:82
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
Definition: Execute.cpp:74
void invalidateInvertedIndex() noexcept
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
const uint64_t round_up_p2(const uint64_t num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:25
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< int32_t > sorted_cache
void appendToStorage(String str) noexcept
~StringDictionary() noexcept
std::shared_ptr< const std::vector< std::string > > copyStrings() const
std::vector< uint32_t > rk_hashes_