OmniSciDB  21ac014ffc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <tbb/parallel_for.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 #include <boost/sort/spreadsort/string_sort.hpp>
23 #include <future>
24 #include <iostream>
25 #include <string_view>
26 #include <thread>
27 
28 // TODO(adb): fixup
29 #ifdef _MSC_VER
30 #include <fcntl.h>
31 #else
32 #include <sys/fcntl.h>
33 #endif
34 
35 #include "Logger/Logger.h"
36 #include "OSDependent/omnisci_fs.h"
37 #include "Shared/sqltypes.h"
38 #include "Shared/thread_count.h"
39 #include "StringDictionaryClient.h"
40 #include "Utils/Regexp.h"
41 #include "Utils/StringLike.h"
42 
43 #include "LeafHostInfo.h"
44 
46 
47 namespace {
48 
50 
51 int checked_open(const char* path, const bool recover) {
52  auto fd = omnisci::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
53  if (fd > 0) {
54  return fd;
55  }
56  auto err = std::string("Dictionary path ") + std::string(path) +
57  std::string(" does not exist.");
58  LOG(ERROR) << err;
59  throw DictPayloadUnavailable(err);
60 }
61 
62 const uint64_t round_up_p2(const uint64_t num) {
63  uint64_t in = num;
64  in--;
65  in |= in >> 1;
66  in |= in >> 2;
67  in |= in >> 4;
68  in |= in >> 8;
69  in |= in >> 16;
70  in++;
71  // TODO MAT deal with case where filesize has been increased but reality is
72  // we are constrained to 2^31.
73  // In that situation this calculation will wrap to zero
74  if (in == 0 || (in > (UINT32_MAX))) {
75  in = UINT32_MAX;
76  }
77  return in;
78 }
79 
80 string_dict_hash_t hash_string(const std::string_view& str) {
81  string_dict_hash_t str_hash = 1;
82  // rely on fact that unsigned overflow is defined and wraps
83  for (size_t i = 0; i < str.size(); ++i) {
84  str_hash = str_hash * 997 + str[i];
85  }
86  return str_hash;
87 }
88 } // namespace
89 
91 constexpr int32_t StringDictionary::INVALID_STR_ID;
92 constexpr size_t StringDictionary::MAX_STRLEN;
93 constexpr size_t StringDictionary::MAX_STRCOUNT;
94 
95 StringDictionary::StringDictionary(const std::string& folder,
96  const bool isTemp,
97  const bool recover,
98  const bool materializeHashes,
99  size_t initial_capacity)
100  : folder_(folder)
101  , str_count_(0)
102  , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
103  , hash_cache_(initial_capacity)
104  , isTemp_(isTemp)
105  , materialize_hashes_(materializeHashes)
106  , payload_fd_(-1)
107  , offset_fd_(-1)
108  , offset_map_(nullptr)
109  , payload_map_(nullptr)
110  , offset_file_size_(0)
111  , payload_file_size_(0)
112  , payload_file_off_(0)
113  , strings_cache_(nullptr) {
114  if (!isTemp && folder.empty()) {
115  return;
116  }
117 
118  // initial capacity must be a power of two for efficient bucket computation
119  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
120  if (!isTemp_) {
121  boost::filesystem::path storage_path(folder);
122  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
123  const auto payload_path =
124  (storage_path / boost::filesystem::path("DictPayload")).string();
125  payload_fd_ = checked_open(payload_path.c_str(), recover);
126  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
129  }
130  bool storage_is_empty = false;
131  if (payload_file_size_ == 0) {
132  storage_is_empty = true;
134  }
135  if (offset_file_size_ == 0) {
137  }
138  if (!isTemp_) { // we never mmap or recover temp dictionaries
139  payload_map_ =
140  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
141  offset_map_ = reinterpret_cast<StringIdxEntry*>(
143  if (recover) {
144  const size_t bytes = omnisci::file_size(offset_fd_);
145  if (bytes % sizeof(StringIdxEntry) != 0) {
146  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
147  }
148  const uint64_t str_count =
149  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
150  collisions_ = 0;
151  // at this point we know the size of the StringDict we need to load
152  // so lets reallocate the vector to the correct size
153  const uint64_t max_entries =
154  std::max(round_up_p2(str_count * 2 + 1),
155  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
156  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
157  string_id_string_dict_hash_table_.swap(new_str_ids);
158  if (materialize_hashes_) {
159  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
160  hash_cache_.swap(new_hash_cache);
161  }
162  // Bail early if we know we don't have strings to add (i.e. a new or empty
163  // dictionary)
164  if (str_count == 0) {
165  return;
166  }
167 
168  unsigned string_id = 0;
169  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
170 
171  uint32_t thread_inits = 0;
172  const auto thread_count = std::thread::hardware_concurrency();
173  const uint32_t items_per_thread = std::max<uint32_t>(
174  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
175  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
176  dictionary_futures;
177  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
178  dictionary_futures.emplace_back(std::async(
179  std::launch::async, [string_id, str_count, items_per_thread, this] {
180  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
181  for (uint32_t curr_id = string_id;
182  curr_id < string_id + items_per_thread && curr_id < str_count;
183  curr_id++) {
184  const auto recovered = getStringFromStorage(curr_id);
185  if (recovered.canary) {
186  // hit the canary, recovery finished
187  break;
188  } else {
189  std::string_view temp(recovered.c_str_ptr, recovered.size);
190  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
191  }
192  }
193  return hashVec;
194  }));
195  thread_inits++;
196  if (thread_inits % thread_count == 0) {
197  processDictionaryFutures(dictionary_futures);
198  }
199  }
200  // gather last few threads
201  if (dictionary_futures.size() != 0) {
202  processDictionaryFutures(dictionary_futures);
203  }
204  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
205  << " Hash table size: " << string_id_string_dict_hash_table_.size()
206  << " Fill rate: "
207  << static_cast<double>(str_count_) * 100.0 /
209  << "% Collisions: " << collisions_;
210  }
211  }
212 }
213 
215  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
216  dictionary_futures) {
217  for (auto& dictionary_future : dictionary_futures) {
218  dictionary_future.wait();
219  const auto hashVec = dictionary_future.get();
220  for (const auto& hash : hashVec) {
221  const uint32_t bucket =
223  payload_file_off_ += hash.second;
224  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
225  if (materialize_hashes_) {
226  hash_cache_[str_count_] = hash.first;
227  }
228  ++str_count_;
229  }
230  }
231  dictionary_futures.clear();
232 }
233 
241 size_t StringDictionary::getNumStringsFromStorage(const size_t storage_slots) const
242  noexcept {
243  if (storage_slots == 0) {
244  return 0;
245  }
246  // Must use signed integers since final binary search step can wrap to max size_t value
247  // if dictionary is empty
248  int64_t min_bound = 0;
249  int64_t max_bound = storage_slots - 1;
250  int64_t guess{0};
251  while (min_bound <= max_bound) {
252  guess = (max_bound + min_bound) / 2;
253  CHECK_GE(guess, 0);
254  if (getStringFromStorage(guess).canary) {
255  max_bound = guess - 1;
256  } else {
257  min_bound = guess + 1;
258  }
259  }
260  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
261  return guess + (min_bound > guess ? 1 : 0);
262 }
263 
265  : folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
266  std::to_string(dict_ref.dictId))
267  , strings_cache_(nullptr)
268  , client_(new StringDictionaryClient(host, dict_ref, true))
269  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
270 
272  free(CANARY_BUFFER);
273  if (client_) {
274  return;
275  }
276  if (payload_map_) {
277  if (!isTemp_) {
281  CHECK_GE(payload_fd_, 0);
283  CHECK_GE(offset_fd_, 0);
285  } else {
287  free(payload_map_);
288  free(offset_map_);
289  }
290  }
291 }
292 
293 int32_t StringDictionary::getOrAdd(const std::string& str) noexcept {
294  if (client_) {
295  std::vector<int32_t> string_ids;
296  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
297  CHECK_EQ(size_t(1), string_ids.size());
298  return string_ids.front();
299  }
300  return getOrAddImpl(str);
301 }
302 
303 namespace {
304 
305 template <class T>
306 void throw_encoding_error(std::string_view str, std::string_view folder) {
307  std::ostringstream oss;
308  oss << "The text encoded column stored at " << folder << ", has exceeded its limit of "
309  << sizeof(T) * 8 << " bits (" << static_cast<size_t>(max_valid_int_value<T>() + 1)
310  << " unique values)."
311  << " There was an attempt to add the new string '" << str
312  << "'. Table will need to be recreated with larger String Dictionary Capacity";
313  LOG(ERROR) << oss.str();
314  throw std::runtime_error(oss.str());
315 }
316 
317 } // namespace
318 
319 template <class String>
321  const std::vector<std::vector<String>>& string_array_vec,
322  std::vector<std::vector<int32_t>>& ids_array_vec) {
323  ids_array_vec.resize(string_array_vec.size());
324  for (size_t i = 0; i < string_array_vec.size(); i++) {
325  auto& strings = string_array_vec[i];
326  auto& ids = ids_array_vec[i];
327  ids.resize(strings.size());
328  getOrAddBulk(strings, &ids[0]);
329  }
330 }
331 
333  const std::vector<std::vector<std::string>>& string_array_vec,
334  std::vector<std::vector<int32_t>>& ids_array_vec);
335 
341 template <class String>
342 void StringDictionary::hashStrings(const std::vector<String>& string_vec,
343  std::vector<string_dict_hash_t>& hashes) const
344  noexcept {
345  CHECK_EQ(string_vec.size(), hashes.size());
346 
347  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
348  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
349  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
350  if (string_vec[curr_id].empty()) {
351  continue;
352  }
353  hashes[curr_id] = hash_string(string_vec[curr_id]);
354  }
355  });
356 }
357 
358 template <class T, class String>
359 void StringDictionary::getOrAddBulk(const std::vector<String>& input_strings,
360  T* output_string_ids) {
362  getOrAddBulkParallel(input_strings, output_string_ids);
363  return;
364  }
365  // Single-thread path.
366  if (client_no_timeout_) {
367  getOrAddBulkRemote(input_strings, output_string_ids);
368  return;
369  }
370  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
371 
372  const size_t initial_str_count = str_count_;
373  size_t idx = 0;
374  for (const auto& input_string : input_strings) {
375  if (input_string.empty()) {
376  output_string_ids[idx++] = inline_int_null_value<T>();
377  continue;
378  }
379  CHECK(input_string.size() <= MAX_STRLEN);
380 
381  const string_dict_hash_t input_string_hash = hash_string(input_string);
382  uint32_t hash_bucket =
383  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
385  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
386  continue;
387  }
388  // need to add record to dictionary
389  // check there is room
390  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
391  throw_encoding_error<T>(input_string, folder_);
392  }
394  << "Maximum number (" << str_count_
395  << ") of Dictionary encoded Strings reached for this column, offset path "
396  "for column is "
397  << offsets_path_;
398  if (fillRateIsHigh(str_count_)) {
399  // resize when more than 50% is full
401  hash_bucket = computeBucket(
402  input_string_hash, input_string, string_id_string_dict_hash_table_);
403  }
404  appendToStorage(input_string);
405 
406  if (materialize_hashes_) {
407  hash_cache_[str_count_] = input_string_hash;
408  }
409  const int32_t string_id = static_cast<int32_t>(str_count_);
410  string_id_string_dict_hash_table_[hash_bucket] = string_id;
411  output_string_ids[idx++] = string_id;
412  ++str_count_;
413  }
414  const size_t num_strings_added = str_count_ - initial_str_count;
415  if (num_strings_added > 0) {
417  }
418 }
419 
420 template <class T, class String>
421 void StringDictionary::getOrAddBulkParallel(const std::vector<String>& input_strings,
422  T* output_string_ids) {
423  if (client_no_timeout_) {
424  getOrAddBulkRemote(input_strings, output_string_ids);
425  return;
426  }
427  // Compute hashes of the input strings up front, and in parallel,
428  // as the string hashing does not need to be behind the subsequent write_lock
429  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
430  hashStrings(input_strings, input_strings_hashes);
431 
432  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
433  size_t shadow_str_count =
434  str_count_; // Need to shadow str_count_ now with bulk add methods
435  const size_t storage_high_water_mark = shadow_str_count;
436  std::vector<size_t> string_memory_ids;
437  size_t sum_new_string_lengths = 0;
438  string_memory_ids.reserve(input_strings.size());
439  size_t input_string_idx{0};
440  for (const auto& input_string : input_strings) {
441  // Currently we make empty strings null
442  if (input_string.empty()) {
443  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
444  continue;
445  }
446  // TODO: Recover gracefully if an input string is too long
447  CHECK(input_string.size() <= MAX_STRLEN);
448 
449  if (fillRateIsHigh(shadow_str_count)) {
450  // resize when more than 50% is full
452  storage_high_water_mark,
453  input_strings,
454  string_memory_ids,
455  input_strings_hashes);
456  }
457  // Compute the hash for this input_string
458  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
459 
460  const uint32_t hash_bucket =
461  computeBucketFromStorageAndMemory(input_string_hash,
462  input_string,
464  storage_high_water_mark,
465  input_strings,
466  string_memory_ids);
467 
468  // If the hash bucket is not empty, that is our string id
469  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
470  // bucket string are equal)
472  output_string_ids[input_string_idx++] =
474  continue;
475  }
476  // Did not find string, so need to add record to dictionary
477  // First check there is room
478  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
479  throw_encoding_error<T>(input_string, folder_);
480  }
481  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
482  << "Maximum number (" << shadow_str_count
483  << ") of Dictionary encoded Strings reached for this column, offset path "
484  "for column is "
485  << offsets_path_;
486 
487  string_memory_ids.push_back(input_string_idx);
488  sum_new_string_lengths += input_string.size();
489  string_id_string_dict_hash_table_[hash_bucket] =
490  static_cast<int32_t>(shadow_str_count);
491  if (materialize_hashes_) {
492  hash_cache_[shadow_str_count] = input_string_hash;
493  }
494  output_string_ids[input_string_idx++] = shadow_str_count++;
495  }
496  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
497  const size_t num_strings_added = shadow_str_count - str_count_;
498  str_count_ = shadow_str_count;
499  if (num_strings_added > 0) {
501  }
502 }
503 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
504  uint8_t* encoded_vec);
505 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
506  uint16_t* encoded_vec);
507 template void StringDictionary::getOrAddBulk(const std::vector<std::string>& string_vec,
508  int32_t* encoded_vec);
509 
510 template void StringDictionary::getOrAddBulk(
511  const std::vector<std::string_view>& string_vec,
512  uint8_t* encoded_vec);
513 template void StringDictionary::getOrAddBulk(
514  const std::vector<std::string_view>& string_vec,
515  uint16_t* encoded_vec);
516 template void StringDictionary::getOrAddBulk(
517  const std::vector<std::string_view>& string_vec,
518  int32_t* encoded_vec);
519 
520 template <class T, class String>
521 void StringDictionary::getOrAddBulkRemote(const std::vector<String>& string_vec,
522  T* encoded_vec) {
524  std::vector<int32_t> string_ids;
525  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
526  size_t out_idx{0};
527  for (size_t i = 0; i < string_ids.size(); ++i) {
528  const auto string_id = string_ids[i];
529  const bool invalid = string_id > max_valid_int_value<T>();
530  if (invalid || string_id == inline_int_null_value<int32_t>()) {
531  if (invalid) {
532  throw_encoding_error<T>(string_vec[i], folder_);
533  }
534  encoded_vec[out_idx++] = inline_int_null_value<T>();
535  continue;
536  }
537  encoded_vec[out_idx++] = string_id;
538  }
539 }
540 
542  const std::vector<std::string>& string_vec,
543  uint8_t* encoded_vec);
545  const std::vector<std::string>& string_vec,
546  uint16_t* encoded_vec);
548  const std::vector<std::string>& string_vec,
549  int32_t* encoded_vec);
550 
552  const std::vector<std::string_view>& string_vec,
553  uint8_t* encoded_vec);
555  const std::vector<std::string_view>& string_vec,
556  uint16_t* encoded_vec);
558  const std::vector<std::string_view>& string_vec,
559  int32_t* encoded_vec);
560 
561 int32_t StringDictionary::getIdOfString(const std::string& str) const {
562  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
563  if (client_) {
564  return client_->get(str);
565  }
566  return getUnlocked(str);
567 }
568 
569 int32_t StringDictionary::getUnlocked(const std::string& str) const noexcept {
570  const string_dict_hash_t hash = hash_string(str);
571  auto str_id = string_id_string_dict_hash_table_[computeBucket(
572  hash, str, string_id_string_dict_hash_table_)];
573  return str_id;
574 }
575 
576 std::string StringDictionary::getString(int32_t string_id) const {
577  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
578  if (client_) {
579  std::string ret;
580  client_->get_string(ret, string_id);
581  return ret;
582  }
583  return getStringUnlocked(string_id);
584 }
585 
586 std::string StringDictionary::getStringUnlocked(int32_t string_id) const noexcept {
587  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
588  return getStringChecked(string_id);
589 }
590 
591 std::pair<char*, size_t> StringDictionary::getStringBytes(int32_t string_id) const
592  noexcept {
593  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
594  CHECK(!client_);
595  CHECK_LE(0, string_id);
596  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
597  return getStringBytesChecked(string_id);
598 }
599 
601  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
602  if (client_) {
603  return client_->storage_entry_count();
604  }
605  return str_count_;
606 }
607 
608 namespace {
609 
610 bool is_like(const std::string& str,
611  const std::string& pattern,
612  const bool icase,
613  const bool is_simple,
614  const char escape) {
615  return icase
616  ? (is_simple ? string_ilike_simple(
617  str.c_str(), str.size(), pattern.c_str(), pattern.size())
618  : string_ilike(str.c_str(),
619  str.size(),
620  pattern.c_str(),
621  pattern.size(),
622  escape))
623  : (is_simple ? string_like_simple(
624  str.c_str(), str.size(), pattern.c_str(), pattern.size())
625  : string_like(str.c_str(),
626  str.size(),
627  pattern.c_str(),
628  pattern.size(),
629  escape));
630 }
631 
632 } // namespace
633 
634 std::vector<int32_t> StringDictionary::getLike(const std::string& pattern,
635  const bool icase,
636  const bool is_simple,
637  const char escape,
638  const size_t generation) const {
639  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
640  if (client_) {
641  return client_->get_like(pattern, icase, is_simple, escape, generation);
642  }
643  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
644  const auto it = like_cache_.find(cache_key);
645  if (it != like_cache_.end()) {
646  return it->second;
647  }
648  std::vector<int32_t> result;
649  std::vector<std::thread> workers;
650  int worker_count = cpu_threads();
651  CHECK_GT(worker_count, 0);
652  std::vector<std::vector<int32_t>> worker_results(worker_count);
653  CHECK_LE(generation, str_count_);
654  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
655  workers.emplace_back([&worker_results,
656  &pattern,
657  generation,
658  icase,
659  is_simple,
660  escape,
661  worker_idx,
662  worker_count,
663  this]() {
664  for (size_t string_id = worker_idx; string_id < generation;
665  string_id += worker_count) {
666  const auto str = getStringUnlocked(string_id);
667  if (is_like(str, pattern, icase, is_simple, escape)) {
668  worker_results[worker_idx].push_back(string_id);
669  }
670  }
671  });
672  }
673  for (auto& worker : workers) {
674  worker.join();
675  }
676  for (const auto& worker_result : worker_results) {
677  result.insert(result.end(), worker_result.begin(), worker_result.end());
678  }
679  // place result into cache for reuse if similar query
680  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
681 
682  CHECK(it_ok.second);
683 
684  return result;
685 }
686 
687 std::vector<int32_t> StringDictionary::getEquals(std::string pattern,
688  std::string comp_operator,
689  size_t generation) {
690  std::vector<int32_t> result;
691  auto eq_id_itr = equal_cache_.find(pattern);
692  int32_t eq_id = MAX_STRLEN + 1;
693  int32_t cur_size = str_count_;
694  if (eq_id_itr != equal_cache_.end()) {
695  auto eq_id = eq_id_itr->second;
696  if (comp_operator == "=") {
697  result.push_back(eq_id);
698  } else {
699  for (int32_t idx = 0; idx <= cur_size; idx++) {
700  if (idx == eq_id) {
701  continue;
702  }
703  result.push_back(idx);
704  }
705  }
706  } else {
707  std::vector<std::thread> workers;
708  int worker_count = cpu_threads();
709  CHECK_GT(worker_count, 0);
710  std::vector<std::vector<int32_t>> worker_results(worker_count);
711  CHECK_LE(generation, str_count_);
712  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
713  workers.emplace_back(
714  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
715  for (size_t string_id = worker_idx; string_id < generation;
716  string_id += worker_count) {
717  const auto str = getStringUnlocked(string_id);
718  if (str == pattern) {
719  worker_results[worker_idx].push_back(string_id);
720  }
721  }
722  });
723  }
724  for (auto& worker : workers) {
725  worker.join();
726  }
727  for (const auto& worker_result : worker_results) {
728  result.insert(result.end(), worker_result.begin(), worker_result.end());
729  }
730  if (result.size() > 0) {
731  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
732  CHECK(it_ok.second);
733  eq_id = result[0];
734  }
735  if (comp_operator == "<>") {
736  for (int32_t idx = 0; idx <= cur_size; idx++) {
737  if (idx == eq_id) {
738  continue;
739  }
740  result.push_back(idx);
741  }
742  }
743  }
744  return result;
745 }
746 
747 std::vector<int32_t> StringDictionary::getCompare(const std::string& pattern,
748  const std::string& comp_operator,
749  const size_t generation) {
750  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
751  if (client_) {
752  return client_->get_compare(pattern, comp_operator, generation);
753  }
754  std::vector<int32_t> ret;
755  if (str_count_ == 0) {
756  return ret;
757  }
758  if (sorted_cache.size() < str_count_) {
759  if (comp_operator == "=" || comp_operator == "<>") {
760  return getEquals(pattern, comp_operator, generation);
761  }
762 
764  }
765  auto cache_index = compare_cache_.get(pattern);
766 
767  if (!cache_index) {
768  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
769  const auto cache_itr = std::lower_bound(
770  sorted_cache.begin(),
771  sorted_cache.end(),
772  pattern,
773  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
774  auto a_str = this->getStringFromStorage(a);
775  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
776  });
777 
778  if (cache_itr == sorted_cache.end()) {
779  cache_index->index = sorted_cache.size() - 1;
780  cache_index->diff = 1;
781  } else {
782  const auto cache_str = getStringFromStorage(*cache_itr);
783  if (!string_eq(
784  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
785  cache_index->index = cache_itr - sorted_cache.begin() - 1;
786  cache_index->diff = 1;
787  } else {
788  cache_index->index = cache_itr - sorted_cache.begin();
789  cache_index->diff = 0;
790  }
791  }
792 
793  compare_cache_.put(pattern, cache_index);
794  }
795 
796  // since we have a cache in form of vector of ints which is sorted according to
797  // corresponding strings in the dictionary all we need is the index of the element
798  // which equal to the pattern that we are trying to match or the index of “biggest”
799  // element smaller than the pattern, to perform all the comparison operators over
800  // string. The search function guarantees we have such index so now it is just the
801  // matter to include all the elements in the result vector.
802 
803  // For < operator if the index that we have points to the element which is equal to
804  // the pattern that we are searching for we simply get all the elements less than the
805  // index. If the element pointed by the index is not equal to the pattern we are
806  // comparing with we also need to include that index in result vector, except when the
807  // index points to 0 and the pattern is lesser than the smallest value in the string
808  // dictionary.
809 
810  if (comp_operator == "<") {
811  size_t idx = cache_index->index;
812  if (cache_index->diff) {
813  idx = cache_index->index + 1;
814  if (cache_index->index == 0 && cache_index->diff > 0) {
815  idx = cache_index->index;
816  }
817  }
818  for (size_t i = 0; i < idx; i++) {
819  ret.push_back(sorted_cache[i]);
820  }
821 
822  // For <= operator if the index that we have points to the element which is equal to
823  // the pattern that we are searching for we want to include the element pointed by
824  // the index in the result set. If the element pointed by the index is not equal to
825  // the pattern we are comparing with we just want to include all the ids with index
826  // less than the index that is cached, except when pattern that we are searching for
827  // is smaller than the smallest string in the dictionary.
828 
829  } else if (comp_operator == "<=") {
830  size_t idx = cache_index->index + 1;
831  if (cache_index == 0 && cache_index->diff > 0) {
832  idx = cache_index->index;
833  }
834  for (size_t i = 0; i < idx; i++) {
835  ret.push_back(sorted_cache[i]);
836  }
837 
838  // For > operator we want to get all the elements with index greater than the index
839  // that we have except, when the pattern we are searching for is lesser than the
840  // smallest string in the dictionary we also want to include the id of the index
841  // that we have.
842 
843  } else if (comp_operator == ">") {
844  size_t idx = cache_index->index + 1;
845  if (cache_index->index == 0 && cache_index->diff > 0) {
846  idx = cache_index->index;
847  }
848  for (size_t i = idx; i < sorted_cache.size(); i++) {
849  ret.push_back(sorted_cache[i]);
850  }
851 
852  // For >= operator when the indexed element that we have points to element which is
853  // equal to the pattern we are searching for we want to include that in the result
854  // vector. If the index that we have does not point to the string which is equal to
855  // the pattern we are searching we don’t want to include that id into the result
856  // vector except when the index is 0.
857 
858  } else if (comp_operator == ">=") {
859  size_t idx = cache_index->index;
860  if (cache_index->diff) {
861  idx = cache_index->index + 1;
862  if (cache_index->index == 0 && cache_index->diff > 0) {
863  idx = cache_index->index;
864  }
865  }
866  for (size_t i = idx; i < sorted_cache.size(); i++) {
867  ret.push_back(sorted_cache[i]);
868  }
869  } else if (comp_operator == "=") {
870  if (!cache_index->diff) {
871  ret.push_back(sorted_cache[cache_index->index]);
872  }
873 
874  // For <> operator it is simple matter of not including id of string which is equal
875  // to pattern we are searching for.
876  } else if (comp_operator == "<>") {
877  if (!cache_index->diff) {
878  size_t idx = cache_index->index;
879  for (size_t i = 0; i < idx; i++) {
880  ret.push_back(sorted_cache[i]);
881  }
882  ++idx;
883  for (size_t i = idx; i < sorted_cache.size(); i++) {
884  ret.push_back(sorted_cache[i]);
885  }
886  } else {
887  for (size_t i = 0; i < sorted_cache.size(); i++) {
888  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
889  }
890  }
891 
892  } else {
893  std::runtime_error("Unsupported string comparison operator");
894  }
895  return ret;
896 }
897 
898 namespace {
899 
900 bool is_regexp_like(const std::string& str,
901  const std::string& pattern,
902  const char escape) {
903  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
904 }
905 
906 } // namespace
907 
908 std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
909  const char escape,
910  const size_t generation) const {
911  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
912  if (client_) {
913  return client_->get_regexp_like(pattern, escape, generation);
914  }
915  const auto cache_key = std::make_pair(pattern, escape);
916  const auto it = regex_cache_.find(cache_key);
917  if (it != regex_cache_.end()) {
918  return it->second;
919  }
920  std::vector<int32_t> result;
921  std::vector<std::thread> workers;
922  int worker_count = cpu_threads();
923  CHECK_GT(worker_count, 0);
924  std::vector<std::vector<int32_t>> worker_results(worker_count);
925  CHECK_LE(generation, str_count_);
926  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
927  workers.emplace_back([&worker_results,
928  &pattern,
929  generation,
930  escape,
931  worker_idx,
932  worker_count,
933  this]() {
934  for (size_t string_id = worker_idx; string_id < generation;
935  string_id += worker_count) {
936  const auto str = getStringUnlocked(string_id);
937  if (is_regexp_like(str, pattern, escape)) {
938  worker_results[worker_idx].push_back(string_id);
939  }
940  }
941  });
942  }
943  for (auto& worker : workers) {
944  worker.join();
945  }
946  for (const auto& worker_result : worker_results) {
947  result.insert(result.end(), worker_result.begin(), worker_result.end());
948  }
949  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
950  CHECK(it_ok.second);
951 
952  return result;
953 }
954 
955 std::shared_ptr<const std::vector<std::string>> StringDictionary::copyStrings() const {
956  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
957  if (client_) {
958  // TODO(miyu): support remote string dictionary
959  throw std::runtime_error(
960  "copying dictionaries from remote server is not supported yet.");
961  }
962 
963  if (strings_cache_) {
964  return strings_cache_;
965  }
966 
967  strings_cache_ = std::make_shared<std::vector<std::string>>();
968  strings_cache_->reserve(str_count_);
969  const bool multithreaded = str_count_ > 10000;
970  const auto worker_count =
971  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
972  CHECK_GT(worker_count, 0UL);
973  std::vector<std::vector<std::string>> worker_results(worker_count);
974  auto copy = [this](std::vector<std::string>& str_list,
975  const size_t start_id,
976  const size_t end_id) {
977  CHECK_LE(start_id, end_id);
978  str_list.reserve(end_id - start_id);
979  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
980  str_list.push_back(getStringUnlocked(string_id));
981  }
982  };
983  if (multithreaded) {
984  std::vector<std::future<void>> workers;
985  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
986  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
987  worker_idx < worker_count && start < str_count_;
988  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
989  workers.push_back(std::async(
990  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
991  }
992  for (auto& worker : workers) {
993  worker.get();
994  }
995  } else {
996  CHECK_EQ(worker_results.size(), size_t(1));
997  copy(worker_results[0], 0, str_count_);
998  }
999 
1000  for (const auto& worker_result : worker_results) {
1001  strings_cache_->insert(
1002  strings_cache_->end(), worker_result.begin(), worker_result.end());
1003  }
1004  return strings_cache_;
1005 }
1006 
1007 bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {
1008  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1009 }
1010 
1012  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1013  INVALID_STR_ID);
1014 
1015  if (materialize_hashes_) {
1016  for (size_t i = 0; i != str_count_; ++i) {
1017  const string_dict_hash_t hash = hash_cache_[i];
1018  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1019  new_str_ids[bucket] = i;
1020  }
1021  hash_cache_.resize(hash_cache_.size() * 2);
1022  } else {
1023  for (size_t i = 0; i != str_count_; ++i) {
1024  const auto str = getStringChecked(i);
1025  const string_dict_hash_t hash = hash_string(str);
1026  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1027  new_str_ids[bucket] = i;
1028  }
1029  }
1030  string_id_string_dict_hash_table_.swap(new_str_ids);
1031 }
1032 
1033 template <class String>
1035  const size_t str_count, // str_count_ is only persisted strings, so need transient
1036  // shadow count
1037  const size_t storage_high_water_mark,
1038  const std::vector<String>& input_strings,
1039  const std::vector<size_t>& string_memory_ids,
1040  const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1041  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1042  INVALID_STR_ID);
1043  if (materialize_hashes_) {
1044  for (size_t i = 0; i != str_count; ++i) {
1045  const string_dict_hash_t hash = hash_cache_[i];
1046  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1047  new_str_ids[bucket] = i;
1048  }
1049  hash_cache_.resize(hash_cache_.size() * 2);
1050  } else {
1051  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1052  const auto storage_string = getStringChecked(storage_idx);
1053  const string_dict_hash_t hash = hash_string(storage_string);
1054  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1055  new_str_ids[bucket] = storage_idx;
1056  }
1057  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1058  const size_t string_memory_id = string_memory_ids[memory_idx];
1059  const uint32_t bucket = computeUniqueBucketWithHash(
1060  input_strings_hashes[string_memory_id], new_str_ids);
1061  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1062  }
1063  }
1064  string_id_string_dict_hash_table_.swap(new_str_ids);
1065 }
1066 
1067 int32_t StringDictionary::getOrAddImpl(const std::string_view& str) noexcept {
1068  // @TODO(wei) treat empty string as NULL for now
1069  if (str.size() == 0) {
1070  return inline_int_null_value<int32_t>();
1071  }
1072  CHECK(str.size() <= MAX_STRLEN);
1073  const string_dict_hash_t hash = hash_string(str);
1074  {
1075  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1076  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1077  if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1078  return string_id_string_dict_hash_table_[bucket];
1079  }
1080  }
1081  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1082  if (fillRateIsHigh(str_count_)) {
1083  // resize when more than 50% is full
1084  increaseHashTableCapacity();
1085  }
1086  // need to recalculate the bucket in case it changed before
1087  // we got the lock
1088  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1089  if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1090  CHECK_LT(str_count_, MAX_STRCOUNT)
1091  << "Maximum number (" << str_count_
1092  << ") of Dictionary encoded Strings reached for this column, offset path "
1093  "for column is "
1094  << offsets_path_;
1095  appendToStorage(str);
1096  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1097  if (materialize_hashes_) {
1098  hash_cache_[str_count_] = hash;
1099  }
1100  ++str_count_;
1101  invalidateInvertedIndex();
1102  }
1103  return string_id_string_dict_hash_table_[bucket];
1104 }
1105 
1106 std::string StringDictionary::getStringChecked(const int string_id) const noexcept {
1107  const auto str_canary = getStringFromStorage(string_id);
1108  CHECK(!str_canary.canary);
1109  return std::string(str_canary.c_str_ptr, str_canary.size);
1110 }
1111 
1113  const int string_id) const noexcept {
1114  const auto str_canary = getStringFromStorage(string_id);
1115  CHECK(!str_canary.canary);
1116  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1117 }
1118 
1119 template <class String>
1121  const string_dict_hash_t hash,
1122  const String& input_string,
1123  const std::vector<int32_t>& string_id_string_dict_hash_table) const noexcept {
1124  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1125  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1126  while (true) {
1127  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1128  if (candidate_string_id ==
1129  INVALID_STR_ID) { // In this case it means the slot is available for use
1130  break;
1131  }
1132  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1133  !materialize_hashes_) {
1134  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1135  if (input_string.size() == candidate_string.size() &&
1136  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1137  // found the string
1138  break;
1139  }
1140  }
1141  // wrap around
1142  if (++bucket == string_dict_hash_table_size) {
1143  bucket = 0;
1144  }
1145  }
1146  return bucket;
1147 }
1148 
1149 template <class String>
1151  const string_dict_hash_t input_string_hash,
1152  const String& input_string,
1153  const std::vector<int32_t>& string_id_string_dict_hash_table,
1154  const size_t storage_high_water_mark,
1155  const std::vector<String>& input_strings,
1156  const std::vector<size_t>& string_memory_ids) const noexcept {
1157  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1158  while (true) {
1159  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1160  if (candidate_string_id ==
1161  INVALID_STR_ID) { // In this case it means the slot is available for use
1162  break;
1163  }
1164  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1165  if (candidate_string_id > 0 &&
1166  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1167  // The candidate string is not in storage yet but in our string_memory_ids temp
1168  // buffer
1169  size_t memory_offset =
1170  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1171  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1172  if (input_string.size() == candidate_string.size() &&
1173  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1174  // found the string in the temp memory buffer
1175  break;
1176  }
1177  } else {
1178  // The candidate string is in storage, need to fetch it for comparison
1179  const auto candidate_storage_string =
1180  getStringFromStorageFast(candidate_string_id);
1181  if (input_string.size() == candidate_storage_string.size() &&
1182  !memcmp(input_string.data(),
1183  candidate_storage_string.data(),
1184  input_string.size())) {
1187  // found the string in storage
1188  break;
1189  }
1190  }
1191  }
1192  if (++bucket == string_id_string_dict_hash_table.size()) {
1193  bucket = 0;
1194  }
1195  }
1196  return bucket;
1197 }
1198 
1200  const string_dict_hash_t hash,
1201  const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1202  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1203  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1204  while (true) {
1205  if (string_id_string_dict_hash_table[bucket] ==
1206  INVALID_STR_ID) { // In this case it means the slot is available for use
1207  break;
1208  }
1209  collisions_++;
1210  // wrap around
1211  if (++bucket == string_dict_hash_table_size) {
1212  bucket = 0;
1213  }
1214  }
1215  return bucket;
1216 }
1217 
1219  const size_t write_length) {
1220  if (payload_file_off_ + write_length > payload_file_size_) {
1221  const size_t min_capacity_needed =
1222  write_length - (payload_file_size_ - payload_file_off_);
1223  if (!isTemp_) {
1224  CHECK_GE(payload_fd_, 0);
1226  addPayloadCapacity(min_capacity_needed);
1227  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1228  payload_map_ =
1229  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1230  } else {
1231  addPayloadCapacity(min_capacity_needed);
1232  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1233  }
1234  }
1235 }
1236 
1238  const size_t write_length) {
1239  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1240  if (offset_file_off + write_length >= offset_file_size_) {
1241  const size_t min_capacity_needed =
1242  write_length - (offset_file_size_ - offset_file_off);
1243  if (!isTemp_) {
1244  CHECK_GE(offset_fd_, 0);
1246  addOffsetCapacity(min_capacity_needed);
1247  CHECK(offset_file_off + write_length <= offset_file_size_);
1248  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1250  } else {
1251  addOffsetCapacity(min_capacity_needed);
1252  CHECK(offset_file_off + write_length <= offset_file_size_);
1253  }
1254  }
1255 }
1256 
1257 template <class String>
1258 void StringDictionary::appendToStorage(const String str) noexcept {
1259  // write the payload
1260  checkAndConditionallyIncreasePayloadCapacity(str.size());
1261  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1262 
1263  // write the offset and length
1264  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1265  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1266 
1267  checkAndConditionallyIncreaseOffsetCapacity(sizeof(str_meta));
1268  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1269 }
1270 
1271 template <class String>
1273  const std::vector<String>& input_strings,
1274  const std::vector<size_t>& string_memory_ids,
1275  const size_t sum_new_strings_lengths) noexcept {
1276  const size_t num_strings = string_memory_ids.size();
1277 
1278  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1279  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1280 
1281  for (size_t i = 0; i < num_strings; ++i) {
1282  const size_t string_idx = string_memory_ids[i];
1283  const String str = input_strings[string_idx];
1284  const size_t str_size(str.size());
1285  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1286  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1287  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1288  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1289  }
1290 }
1291 
1292 std::string_view StringDictionary::getStringFromStorageFast(const int string_id) const
1293  noexcept {
1294  const StringIdxEntry* str_meta = offset_map_ + string_id;
1295  return {payload_map_ + str_meta->off, str_meta->size};
1296 }
1297 
1299  const int string_id) const noexcept {
1300  if (!isTemp_) {
1301  CHECK_GE(payload_fd_, 0);
1302  CHECK_GE(offset_fd_, 0);
1303  }
1304  CHECK_GE(string_id, 0);
1305  const StringIdxEntry* str_meta = offset_map_ + string_id;
1306  if (str_meta->size == 0xffff) {
1307  // hit the canary
1308  return {nullptr, 0, true};
1309  }
1310  return {payload_map_ + str_meta->off, str_meta->size, false};
1311 }
1312 
1313 void StringDictionary::addPayloadCapacity(const size_t min_capacity_requested) noexcept {
1314  if (!isTemp_) {
1315  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1316  } else {
1317  payload_map_ = static_cast<char*>(
1318  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1319  }
1320 }
1321 
1322 void StringDictionary::addOffsetCapacity(const size_t min_capacity_requested) noexcept {
1323  if (!isTemp_) {
1324  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1325  } else {
1326  offset_map_ = static_cast<StringIdxEntry*>(
1327  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1328  }
1329 }
1330 
1332  int fd,
1333  const size_t min_capacity_requested) noexcept {
1334  const size_t canary_buff_size_to_add =
1335  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1336  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1337 
1338  if (canary_buffer_size < canary_buff_size_to_add) {
1339  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1340  canary_buffer_size = canary_buff_size_to_add;
1341  CHECK(CANARY_BUFFER);
1342  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1343  }
1344 
1345  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1346  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1347  CHECK(write_return > 0 &&
1348  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1349  return canary_buff_size_to_add;
1350 }
1351 
1353  size_t& mem_size,
1354  const size_t min_capacity_requested) noexcept {
1355  const size_t canary_buff_size_to_add =
1356  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1357  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1358  if (canary_buffer_size < canary_buff_size_to_add) {
1359  CANARY_BUFFER =
1360  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1361  canary_buffer_size = canary_buff_size_to_add;
1362  CHECK(CANARY_BUFFER);
1363  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1364  }
1365  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1366  CHECK(new_addr);
1367  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1368  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1369  mem_size += canary_buff_size_to_add;
1370  return new_addr;
1371 }
1372 
1374  if (!like_cache_.empty()) {
1375  decltype(like_cache_)().swap(like_cache_);
1376  }
1377  if (!regex_cache_.empty()) {
1378  decltype(regex_cache_)().swap(regex_cache_);
1379  }
1380  if (!equal_cache_.empty()) {
1381  decltype(equal_cache_)().swap(equal_cache_);
1382  }
1383  compare_cache_.invalidateInvertedIndex();
1384 }
1385 
1386 // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1387 // load. The next write to the dictionary that does checkpoint will make the
1388 // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1389 // assuming not replicated dictionary
1391  if (client_) {
1392  try {
1393  return client_->checkpoint();
1394  } catch (...) {
1395  return false;
1396  }
1397  }
1398  CHECK(!isTemp_);
1399  bool ret = true;
1400  ret = ret &&
1401  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1402  ret = ret &&
1403  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1404  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1405  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1406  return ret;
1407 }
1408 
1410  // This method is not thread-safe.
1411  const auto cur_cache_size = sorted_cache.size();
1412  std::vector<int32_t> temp_sorted_cache;
1413  for (size_t i = cur_cache_size; i < str_count_; i++) {
1414  temp_sorted_cache.push_back(i);
1415  }
1416  sortCache(temp_sorted_cache);
1417  mergeSortedCache(temp_sorted_cache);
1418 }
1419 
1420 void StringDictionary::sortCache(std::vector<int32_t>& cache) {
1421  // This method is not thread-safe.
1422 
1423  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1424  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1425 
1426  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1427  auto a_str = this->getStringFromStorage(a);
1428  auto b_str = this->getStringFromStorage(b);
1429  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1430  });
1431 }
1432 
1433 void StringDictionary::mergeSortedCache(std::vector<int32_t>& temp_sorted_cache) {
1434  // this method is not thread safe
1435  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1436  size_t t_idx = 0, s_idx = 0, idx = 0;
1437  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1438  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1439  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1440  const auto insert_from_temp_cache =
1441  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1442  if (insert_from_temp_cache) {
1443  updated_cache[idx] = temp_sorted_cache[t_idx++];
1444  } else {
1445  updated_cache[idx] = sorted_cache[s_idx++];
1446  }
1447  }
1448  while (t_idx < temp_sorted_cache.size()) {
1449  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1450  }
1451  while (s_idx < sorted_cache.size()) {
1452  updated_cache[idx++] = sorted_cache[s_idx++];
1453  }
1454  sorted_cache.swap(updated_cache);
1455 }
1456 
1458  std::vector<int32_t>& dest_ids,
1459  StringDictionary* dest_dict,
1460  const std::vector<int32_t>& source_ids,
1461  const StringDictionary* source_dict,
1462  const std::map<int32_t, std::string> transient_mapping) {
1463  std::vector<std::string> strings;
1464 
1465  for (const int32_t source_id : source_ids) {
1466  if (source_id == std::numeric_limits<int32_t>::min()) {
1467  strings.emplace_back("");
1468  } else if (source_id < 0) {
1469  if (auto string_itr = transient_mapping.find(source_id);
1470  string_itr != transient_mapping.end()) {
1471  strings.emplace_back(string_itr->second);
1472  } else {
1473  throw std::runtime_error("Unexpected negative source ID");
1474  }
1475  } else {
1476  strings.push_back(source_dict->getString(source_id));
1477  }
1478  }
1479 
1480  dest_ids.resize(strings.size());
1481  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1482 }
1483 
1485  std::vector<std::vector<int32_t>>& dest_array_ids,
1486  StringDictionary* dest_dict,
1487  const std::vector<std::vector<int32_t>>& source_array_ids,
1488  const StringDictionary* source_dict) {
1489  dest_array_ids.resize(source_array_ids.size());
1490 
1491  std::atomic<size_t> row_idx{0};
1492  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1493  int thread_id) {
1494  for (;;) {
1495  auto row = row_idx.fetch_add(1);
1496 
1497  if (row >= dest_array_ids.size()) {
1498  return;
1499  }
1500  const auto& source_ids = source_array_ids[row];
1501  auto& dest_ids = dest_array_ids[row];
1502  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1503  }
1504  };
1505 
1506  const int num_worker_threads = std::thread::hardware_concurrency();
1507 
1508  if (source_array_ids.size() / num_worker_threads > 10) {
1509  std::vector<std::future<void>> worker_threads;
1510  for (int i = 0; i < num_worker_threads; ++i) {
1511  worker_threads.push_back(std::async(std::launch::async, processor, i));
1512  }
1513 
1514  for (auto& child : worker_threads) {
1515  child.wait();
1516  }
1517  for (auto& child : worker_threads) {
1518  child.get();
1519  }
1520  } else {
1521  processor(0);
1522  }
1523 }
1524 
1525 void translate_string_ids(std::vector<int32_t>& dest_ids,
1526  const LeafHostInfo& dict_server_host,
1527  const DictRef dest_dict_ref,
1528  const std::vector<int32_t>& source_ids,
1529  const DictRef source_dict_ref,
1530  const int32_t dest_generation) {
1531  DictRef temp_dict_ref(-1, -1);
1532  StringDictionaryClient string_client(dict_server_host, temp_dict_ref, false);
1533  string_client.translate_string_ids(
1534  dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
1535 }
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
#define CHECK_EQ(x, y)
Definition: Logger.h:214
void increaseHashTableCapacity() noexcept
int open(const char *path, int flags, int mode)
Definition: omnisci_fs.cpp:64
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
uint64_t off
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
uint64_t size
size_t storageEntryCount() const
#define LOG(tag)
Definition: Logger.h:200
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
tuple r
Definition: test_fsi.py:16
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fillRateIsHigh(const size_t num_strings) const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:219
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
Constants for Builtin SQL Types supported by OmniSci.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:218
int32_t getIdOfString(const std::string &str) const
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
void close(const int fd)
Definition: omnisci_fs.cpp:68
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_NE(x, y)
Definition: Logger.h:215
const std::string folder_
mapd_shared_mutex rw_mutex_
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:246
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:43
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
void appendToStorage(const String str) noexcept
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
bool checkpoint() noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:216
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
#define CHECK_LE(x, y)
Definition: Logger.h:217
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:59
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
bool g_cache_string_hash
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
ThreadId thread_id()
Definition: Logger.cpp:741
mapd_shared_lock< mapd_shared_mutex > read_lock
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
Definition: Execute.cpp:75
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:206
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
int get_page_size()
Definition: omnisci_fs.cpp:27
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
void throw_encoding_error(std::string_view str, std::string_view folder)
PayloadString getStringFromStorage(const int string_id) const noexcept
int cpu_threads()
Definition: thread_count.h:24
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:41
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
#define UINT32_MAX
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::vector< int32_t > sorted_cache
#define VLOG(n)
Definition: Logger.h:300
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:257
std::shared_ptr< const std::vector< std::string > > copyStrings() const
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31