OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionaryProxy.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Logger/Logger.h"
20 #include "Shared/ThreadInfo.h"
21 #include "Shared/misc.h"
22 #include "Shared/sqltypes.h"
23 #include "Shared/thread_count.h"
25 #include "StringOps/StringOps.h"
26 #include "Utils/Regexp.h"
27 #include "Utils/StringLike.h"
28 
29 #include <tbb/parallel_for.h>
30 #include <tbb/task_arena.h>
31 
32 #include <algorithm>
33 #include <iomanip>
34 #include <iostream>
35 #include <string>
36 #include <string_view>
37 #include <thread>
38 
39 StringDictionaryProxy::StringDictionaryProxy(std::shared_ptr<StringDictionary> sd,
40  const int32_t string_dict_id,
41  const int64_t generation)
42  : string_dict_(sd), string_dict_id_(string_dict_id), generation_(generation) {}
43 
44 int32_t truncate_to_generation(const int32_t id, const size_t generation) {
46  return id;
47  }
48  CHECK_GE(id, 0);
49  return static_cast<size_t>(id) >= generation ? StringDictionary::INVALID_STR_ID : id;
50 }
51 
53  const std::vector<std::string>& strings) const {
55  std::vector<int32_t> string_ids(strings.size());
56  getTransientBulkImpl(strings, string_ids.data(), true);
57  return string_ids;
58 }
59 
61  const std::vector<std::string>& strings) {
63  const size_t num_strings = strings.size();
64  std::vector<int32_t> string_ids(num_strings);
65  if (num_strings == 0) {
66  return string_ids;
67  }
68  // Since new strings added to a StringDictionaryProxy are not materialized in the
69  // proxy's underlying StringDictionary, we can use the fast parallel
70  // StringDictionary::getBulk method to fetch ids from the underlying dictionary (which
71  // will return StringDictionary::INVALID_STR_ID for strings that don't exist)
72 
73  // Don't need to be under lock here as the string ids for strings in the underlying
74  // materialized dictionary are immutable
75  const size_t num_strings_not_found =
76  string_dict_->getBulk(strings, string_ids.data(), generation_);
77  if (num_strings_not_found > 0) {
78  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
79  for (size_t string_idx = 0; string_idx < num_strings; ++string_idx) {
80  if (string_ids[string_idx] == StringDictionary::INVALID_STR_ID) {
81  string_ids[string_idx] = getOrAddTransientUnlocked(strings[string_idx]);
82  }
83  }
84  }
85  return string_ids;
86 }
87 
88 template <typename String>
90  unsigned const new_index = transient_str_to_int_.size();
91  auto transient_id = transientIndexToId(new_index);
92  auto const emplaced = transient_str_to_int_.emplace(str, transient_id);
93  if (emplaced.second) { // (str, transient_id) was added to transient_str_to_int_.
94  transient_string_vec_.push_back(&emplaced.first->first);
95  } else { // str already exists in transient_str_to_int_. Return existing transient_id.
96  transient_id = emplaced.first->second;
97  }
98  return transient_id;
99 }
100 
101 int32_t StringDictionaryProxy::getOrAddTransient(const std::string& str) {
102  auto const string_id = getIdOfStringFromClient(str);
103  if (string_id != StringDictionary::INVALID_STR_ID) {
104  return string_id;
105  }
106  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
107  return getOrAddTransientUnlocked(str);
108 }
109 
110 int32_t StringDictionaryProxy::getIdOfString(const std::string& str) const {
111  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
112  auto const str_id = getIdOfStringFromClient(str);
113  if (str_id != StringDictionary::INVALID_STR_ID || transient_str_to_int_.empty()) {
114  return str_id;
115  }
116  auto it = transient_str_to_int_.find(str);
117  return it != transient_str_to_int_.end() ? it->second
119 }
120 
121 template <typename String>
122 int32_t StringDictionaryProxy::getIdOfStringFromClient(const String& str) const {
123  CHECK_GE(generation_, 0);
124  return truncate_to_generation(string_dict_->getIdOfString(str), generation_);
125 }
126 
127 int32_t StringDictionaryProxy::getIdOfStringNoGeneration(const std::string& str) const {
128  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
129  auto str_id = string_dict_->getIdOfString(str);
130  if (str_id != StringDictionary::INVALID_STR_ID || transient_str_to_int_.empty()) {
131  return str_id;
132  }
133  auto it = transient_str_to_int_.find(str);
134  return it != transient_str_to_int_.end() ? it->second
136 }
137 
139  int8_t* proxy_ptr,
140  int32_t string_id) {
141  CHECK(proxy_ptr != nullptr);
142  auto proxy = reinterpret_cast<StringDictionaryProxy*>(proxy_ptr);
143  auto [c_str, len] = proxy->getStringBytes(string_id);
144  return c_str;
145 }
146 
147 extern "C" DEVICE RUNTIME_EXPORT size_t
148 StringDictionaryProxy_getStringLength(int8_t* proxy_ptr, int32_t string_id) {
149  CHECK(proxy_ptr != nullptr);
150  auto proxy = reinterpret_cast<StringDictionaryProxy*>(proxy_ptr);
151  auto [c_str, len] = proxy->getStringBytes(string_id);
152  return len;
153 }
154 
155 extern "C" DEVICE RUNTIME_EXPORT int32_t
156 StringDictionaryProxy_getStringId(int8_t* proxy_ptr, char* c_str_ptr) {
157  CHECK(proxy_ptr != nullptr);
158  auto proxy = reinterpret_cast<StringDictionaryProxy*>(proxy_ptr);
159  std::string str(c_str_ptr);
160  return proxy->getOrAddTransient(str);
161 }
162 
163 std::string StringDictionaryProxy::getString(int32_t string_id) const {
164  if (inline_int_null_value<int32_t>() == string_id) {
165  return "";
166  }
167  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
168  return getStringUnlocked(string_id);
169 }
170 
171 std::string StringDictionaryProxy::getStringUnlocked(const int32_t string_id) const {
172  if (string_id >= 0 && storageEntryCount() > 0) {
173  return string_dict_->getString(string_id);
174  }
175  unsigned const string_index = transientIdToIndex(string_id);
176  CHECK_LT(string_index, transient_string_vec_.size());
177  return *transient_string_vec_[string_index];
178 }
179 
180 std::vector<std::string> StringDictionaryProxy::getStrings(
181  const std::vector<int32_t>& string_ids) const {
182  std::vector<std::string> strings;
183  if (!string_ids.empty()) {
184  strings.reserve(string_ids.size());
185  for (const auto string_id : string_ids) {
186  if (string_id >= 0) {
187  strings.emplace_back(string_dict_->getString(string_id));
188  } else if (inline_int_null_value<int32_t>() == string_id) {
189  strings.emplace_back("");
190  } else {
191  unsigned const string_index = transientIdToIndex(string_id);
192  strings.emplace_back(*transient_string_vec_[string_index]);
193  }
194  }
195  }
196  return strings;
197 }
198 
199 template <typename String>
201  const String& lookup_string) const {
202  const auto it = transient_str_to_int_.find(lookup_string);
204  : it->second;
205 }
206 
209  const StringDictionaryProxy* dest_proxy,
210  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
211  auto timer = DEBUG_TIMER(__func__);
212  IdMap id_map = initIdMap();
213 
214  if (id_map.empty()) {
215  return id_map;
216  }
217 
218  const StringOps_Namespace::StringOps string_ops(string_op_infos);
219 
220  // First map transient strings, store at front of vector map
221  const size_t num_transient_entries = id_map.numTransients();
222  size_t num_transient_strings_not_translated = 0UL;
223  if (num_transient_entries) {
224  std::vector<std::string> transient_lookup_strings(num_transient_entries);
225  if (string_ops.size()) {
227  transient_string_vec_.cend(),
228  transient_lookup_strings.rbegin(),
229  [&](std::string const* ptr) { return string_ops(*ptr); });
230  } else {
232  transient_string_vec_.cend(),
233  transient_lookup_strings.rbegin(),
234  [](std::string const* ptr) { return *ptr; });
235  }
236 
237  // This lookup may have a different snapshot of
238  // dest_proxy transients and dictionary than what happens under
239  // the below dest_proxy_read_lock. We may need an unlocked version of
240  // getTransientBulk to ensure consistency (I don't believe
241  // current behavior would cause crashes/races, verify this though)
242 
243  // Todo(mattp): Consider variant of getTransientBulkImp that can take
244  // a vector of pointer-to-strings so we don't have to materialize
245  // transient_string_vec_ into transient_lookup_strings.
246 
247  num_transient_strings_not_translated =
248  dest_proxy->getTransientBulkImpl(transient_lookup_strings, id_map.data(), false);
249  }
250 
251  // Now map strings in dictionary
252  // We place non-transient strings after the transient strings
253  // if they exist, otherwise at index 0
254  int32_t* translation_map_stored_entries_ptr = id_map.storageData();
255 
256  auto dest_transient_lookup_callback = [dest_proxy, translation_map_stored_entries_ptr](
257  const std::string_view& source_string,
258  const int32_t source_string_id) {
259  translation_map_stored_entries_ptr[source_string_id] =
260  dest_proxy->lookupTransientStringUnlocked(source_string);
261  return translation_map_stored_entries_ptr[source_string_id] ==
263  };
264 
265  const size_t num_dest_transients = dest_proxy->transientEntryCountUnlocked();
266  const size_t num_persisted_strings_not_translated =
267  generation_ > 0 ? string_dict_->buildDictionaryTranslationMap(
268  dest_proxy->string_dict_.get(),
269  translation_map_stored_entries_ptr,
270  generation_,
271  dest_proxy->generation_,
272  num_dest_transients > 0UL,
273  dest_transient_lookup_callback,
274  string_op_infos)
275  : 0UL;
276 
277  const size_t num_dest_entries = dest_proxy->entryCountUnlocked();
278  const size_t num_total_entries =
279  id_map.getVectorMap().size() - 1UL /* account for skipped entry -1 */;
280  CHECK_GT(num_total_entries, 0UL);
281  const size_t num_strings_not_translated =
282  num_transient_strings_not_translated + num_persisted_strings_not_translated;
283  CHECK_LE(num_strings_not_translated, num_total_entries);
284  id_map.setNumUntranslatedStrings(num_strings_not_translated);
285 
286  // Below is a conservative setting of range based on the size of the destination proxy,
287  // but probably not worth a scan over the data (or inline computation as we translate)
288  // to compute the actual ranges
289 
290  id_map.setRangeStart(
291  num_dest_transients > 0 ? -1 - static_cast<int32_t>(num_dest_transients) : 0);
292  id_map.setRangeEnd(dest_proxy->storageEntryCount());
293 
294  const size_t num_entries_translated = num_total_entries - num_strings_not_translated;
295  const float match_pct =
296  100.0 * static_cast<float>(num_entries_translated) / num_total_entries;
297  VLOG(1) << std::fixed << std::setprecision(2) << match_pct << "% ("
298  << num_entries_translated << " entries) from dictionary ("
299  << string_dict_->getDbId() << ", " << string_dict_->getDictId() << ") with "
300  << num_total_entries << " total entries ( " << num_transient_entries
301  << " literals)"
302  << " translated to dictionary (" << dest_proxy->string_dict_->getDbId() << ", "
303  << dest_proxy->string_dict_->getDictId() << ") with " << num_dest_entries
304  << " total entries (" << dest_proxy->transientEntryCountUnlocked()
305  << " literals).";
306 
307  return id_map;
308 }
309 
310 void order_translation_locks(const int32_t source_dict_id,
311  const int32_t dest_dict_id,
312  std::shared_lock<std::shared_mutex>& source_proxy_read_lock,
313  std::unique_lock<std::shared_mutex>& dest_proxy_write_lock) {
314  if (source_dict_id == dest_dict_id) {
315  // proxies are same, only take one write lock
316  dest_proxy_write_lock.lock();
317  } else if (source_dict_id < dest_dict_id) {
318  source_proxy_read_lock.lock();
319  dest_proxy_write_lock.lock();
320  } else {
321  dest_proxy_write_lock.lock();
322  source_proxy_read_lock.lock();
323  }
324 }
325 
328  const StringDictionaryProxy* dest_proxy,
329  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
330  const auto source_dict_id = getDictId();
331  const auto dest_dict_id = dest_proxy->getDictId();
332 
333  std::shared_lock<std::shared_mutex> source_proxy_read_lock(rw_mutex_, std::defer_lock);
334  std::unique_lock<std::shared_mutex> dest_proxy_write_lock(dest_proxy->rw_mutex_,
335  std::defer_lock);
337  source_dict_id, dest_dict_id, source_proxy_read_lock, dest_proxy_write_lock);
338  return buildIntersectionTranslationMapToOtherProxyUnlocked(dest_proxy, string_op_infos);
339 }
340 
342  StringDictionaryProxy* dest_proxy,
343  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) const {
344  auto timer = DEBUG_TIMER(__func__);
345 
346  const auto source_dict_id = getDictId();
347  const auto dest_dict_id = dest_proxy->getDictId();
348  std::shared_lock<std::shared_mutex> source_proxy_read_lock(rw_mutex_, std::defer_lock);
349  std::unique_lock<std::shared_mutex> dest_proxy_write_lock(dest_proxy->rw_mutex_,
350  std::defer_lock);
352  source_dict_id, dest_dict_id, source_proxy_read_lock, dest_proxy_write_lock);
353 
354  auto id_map =
355  buildIntersectionTranslationMapToOtherProxyUnlocked(dest_proxy, string_op_infos);
356  if (id_map.empty()) {
357  return id_map;
358  }
359  const auto num_untranslated_strings = id_map.numUntranslatedStrings();
360  if (num_untranslated_strings > 0) {
361  const size_t total_post_translation_dest_transients =
362  num_untranslated_strings + dest_proxy->transientEntryCountUnlocked();
363  constexpr size_t max_allowed_transients =
364  static_cast<size_t>(std::numeric_limits<int32_t>::max() -
365  2); /* -2 accounts for INVALID_STR_ID and NULL value */
366  if (total_post_translation_dest_transients > max_allowed_transients) {
367  throw std::runtime_error("Union translation to dictionary" +
368  std::to_string(getDictId()) + " would result in " +
369  std::to_string(total_post_translation_dest_transients) +
370  " transient entries, which is more than limit of " +
371  std::to_string(max_allowed_transients) + " transients.");
372  }
373  const int32_t map_domain_start = id_map.domainStart();
374  const int32_t map_domain_end = id_map.domainEnd();
375 
376  const StringOps_Namespace::StringOps string_ops(string_op_infos);
377  const bool has_string_ops = string_ops.size();
378 
379  // First iterate over transient strings and add to dest map
380  // Todo (todd): Add call to fetch string_views (local) or strings (distributed)
381  // for all non-translated ids to avoid string-by-string fetch
382 
383  for (int32_t source_string_id = map_domain_start; source_string_id < -1;
384  ++source_string_id) {
385  if (id_map[source_string_id] == StringDictionary::INVALID_STR_ID) {
386  const auto source_string = getStringUnlocked(source_string_id);
387  const auto dest_string_id = dest_proxy->getOrAddTransientUnlocked(
388  has_string_ops ? string_ops(source_string) : source_string);
389  id_map[source_string_id] = dest_string_id;
390  }
391  }
392  // Now iterate over stored strings
393  for (int32_t source_string_id = 0; source_string_id < map_domain_end;
394  ++source_string_id) {
395  if (id_map[source_string_id] == StringDictionary::INVALID_STR_ID) {
396  const auto source_string = string_dict_->getString(source_string_id);
397  const auto dest_string_id = dest_proxy->getOrAddTransientUnlocked(
398  has_string_ops ? string_ops(source_string) : source_string);
399  id_map[source_string_id] = dest_string_id;
400  }
401  }
402  }
403  // We may have added transients to the destination proxy, use this to update
404  // our id map range (used downstream for ExpressionRange)
405 
406  const size_t num_dest_transients = dest_proxy->transientEntryCountUnlocked();
407  id_map.setRangeStart(
408  num_dest_transients > 0 ? -1 - static_cast<int32_t>(num_dest_transients) : 0);
409  return id_map;
410 }
411 
412 namespace {
413 
414 bool is_like(const std::string& str,
415  const std::string& pattern,
416  const bool icase,
417  const bool is_simple,
418  const char escape) {
419  return icase
420  ? (is_simple ? string_ilike_simple(
421  str.c_str(), str.size(), pattern.c_str(), pattern.size())
422  : string_ilike(str.c_str(),
423  str.size(),
424  pattern.c_str(),
425  pattern.size(),
426  escape))
427  : (is_simple ? string_like_simple(
428  str.c_str(), str.size(), pattern.c_str(), pattern.size())
429  : string_like(str.c_str(),
430  str.size(),
431  pattern.c_str(),
432  pattern.size(),
433  escape));
434 }
435 
436 } // namespace
437 
438 std::vector<int32_t> StringDictionaryProxy::getLike(const std::string& pattern,
439  const bool icase,
440  const bool is_simple,
441  const char escape) const {
442  CHECK_GE(generation_, 0);
443  auto result = string_dict_->getLike(pattern, icase, is_simple, escape, generation_);
444  for (unsigned index = 0; index < transient_string_vec_.size(); ++index) {
445  if (is_like(*transient_string_vec_[index], pattern, icase, is_simple, escape)) {
446  result.push_back(transientIndexToId(index));
447  }
448  }
449  return result;
450 }
451 
452 namespace {
453 
454 bool do_compare(const std::string& str,
455  const std::string& pattern,
456  const std::string& comp_operator) {
457  int res = str.compare(pattern);
458  if (comp_operator == "<") {
459  return res < 0;
460  } else if (comp_operator == "<=") {
461  return res <= 0;
462  } else if (comp_operator == "=") {
463  return res == 0;
464  } else if (comp_operator == ">") {
465  return res > 0;
466  } else if (comp_operator == ">=") {
467  return res >= 0;
468  } else if (comp_operator == "<>") {
469  return res != 0;
470  }
471  throw std::runtime_error("unsupported string compare operator");
472 }
473 
474 } // namespace
475 
477  const std::string& pattern,
478  const std::string& comp_operator) const {
479  CHECK_GE(generation_, 0);
480  auto result = string_dict_->getCompare(pattern, comp_operator, generation_);
481  for (unsigned index = 0; index < transient_string_vec_.size(); ++index) {
482  if (do_compare(*transient_string_vec_[index], pattern, comp_operator)) {
483  result.push_back(transientIndexToId(index));
484  }
485  }
486  return result;
487 }
488 
489 namespace {
490 
491 bool is_regexp_like(const std::string& str,
492  const std::string& pattern,
493  const char escape) {
494  return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
495 }
496 
497 } // namespace
498 
499 std::vector<int32_t> StringDictionaryProxy::getRegexpLike(const std::string& pattern,
500  const char escape) const {
501  CHECK_GE(generation_, 0);
502  auto result = string_dict_->getRegexpLike(pattern, escape, generation_);
503  for (unsigned index = 0; index < transient_string_vec_.size(); ++index) {
504  if (is_regexp_like(*transient_string_vec_[index], pattern, escape)) {
505  result.push_back(transientIndexToId(index));
506  }
507  }
508  return result;
509 }
510 
511 int32_t StringDictionaryProxy::getOrAdd(const std::string& str) noexcept {
512  return string_dict_->getOrAdd(str);
513 }
514 
515 std::pair<const char*, size_t> StringDictionaryProxy::getStringBytes(
516  int32_t string_id) const noexcept {
517  if (string_id >= 0) {
518  return string_dict_.get()->getStringBytes(string_id);
519  }
520  unsigned const string_index = transientIdToIndex(string_id);
521  CHECK_LT(string_index, transient_string_vec_.size());
522  std::string const* const str_ptr = transient_string_vec_[string_index];
523  return {str_ptr->c_str(), str_ptr->size()};
524 }
525 
527  const size_t num_storage_entries{generation_ == -1 ? string_dict_->storageEntryCount()
528  : generation_};
529  CHECK_LE(num_storage_entries, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
530  return num_storage_entries;
531 }
532 
534  // CHECK_LE(num_storage_entries,
535  // static_cast<size_t>(std::numeric_limits<int32_t>::max()));
536  const size_t num_transient_entries{transient_str_to_int_.size()};
537  CHECK_LE(num_transient_entries,
538  static_cast<size_t>(std::numeric_limits<int32_t>::max()) - 1);
539  return num_transient_entries;
540 }
541 
543  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
545 }
546 
549 }
550 
552  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
553  return entryCountUnlocked();
554 }
555 
556 // Iterate over transient strings, then non-transients.
558  StringDictionary::StringCallback& serial_callback) const {
559  constexpr int32_t max_transient_id = -2;
560  // Iterate over transient strings.
561  for (unsigned index = 0; index < transient_string_vec_.size(); ++index) {
562  std::string const& str = *transient_string_vec_[index];
563  int32_t const string_id = max_transient_id - index;
564  serial_callback(str, string_id);
565  }
566  // Iterate over non-transient strings.
567  string_dict_->eachStringSerially(generation_, serial_callback);
568 }
569 
570 // For each (string/_view,old_id) pair passed in:
571 // * Get the new_id based on sdp_'s dictionary, or add it as a transient.
572 // * The StringDictionary is local, so call the faster getUnlocked() method.
573 // * Store the old_id -> new_id translation into the id_map_.
577 
578  public:
580  : sdp_(sdp), id_map_(id_map) {}
581  void operator()(std::string const& str, int32_t const string_id) override {
582  operator()(std::string_view(str), string_id);
583  }
584  void operator()(std::string_view const sv, int32_t const old_id) override {
585  int32_t const new_id = sdp_->string_dict_->getUnlocked(sv);
586  id_map_[old_id] = new_id == StringDictionary::INVALID_STR_ID
588  : new_id;
589  }
590 };
591 
592 // For each (string,old_id) pair passed in:
593 // * Get the new_id based on sdp_'s dictionary, or add it as a transient.
594 // * The StringDictionary is not local, so call string_dict_->makeLambdaStringToId()
595 // to make a lookup hash.
596 // * Store the old_id -> new_id translation into the id_map_.
600  using Lambda = std::function<int32_t(std::string const&)>;
602 
603  public:
605  : sdp_(sdp)
606  , id_map_(id_map)
607  , string_to_id_(sdp->string_dict_->makeLambdaStringToId()) {}
608  void operator()(std::string const& str, int32_t const old_id) override {
609  int32_t const new_id = string_to_id_(str);
610  id_map_[old_id] = new_id == StringDictionary::INVALID_STR_ID
612  : new_id;
613  }
614  void operator()(std::string_view const, int32_t const string_id) override {
615  UNREACHABLE() << "StringNetworkCallback requires a std::string.";
616  }
617 };
618 
619 // Union strings from both StringDictionaryProxies into *this as transients.
620 // Return id_map: sdp_rhs:string_id -> this:string_id for each string in sdp_rhs.
622  StringDictionaryProxy const& sdp_rhs) {
623  IdMap id_map = sdp_rhs.initIdMap();
624  // serial_callback cannot be parallelized due to calling getOrAddTransientUnlocked().
625  std::unique_ptr<StringDictionary::StringCallback> serial_callback;
626  if (string_dict_->isClient()) {
627  serial_callback = std::make_unique<StringNetworkCallback>(this, id_map);
628  } else {
629  serial_callback = std::make_unique<StringLocalCallback>(this, id_map);
630  }
631  // Import all non-duplicate strings (transient and non-transient) and add to id_map.
632  sdp_rhs.eachStringSerially(*serial_callback);
633  return id_map;
634 }
635 
636 std::ostream& operator<<(std::ostream& os, StringDictionaryProxy::IdMap const& id_map) {
637  return os << "IdMap(offset_(" << id_map.offset_ << ") vector_map_"
638  << shared::printContainer(id_map.vector_map_) << ')';
639 }
640 
641 void StringDictionaryProxy::updateGeneration(const int64_t generation) noexcept {
642  if (generation == -1) {
643  return;
644  }
645  if (generation_ != -1) {
646  CHECK_EQ(generation_, generation);
647  return;
648  }
649  generation_ = generation;
650 }
651 
653  const std::vector<std::string>& strings,
654  int32_t* string_ids,
655  const bool take_read_lock) const {
656  const size_t num_strings = strings.size();
657  if (num_strings == 0) {
658  return 0UL;
659  }
660  // StringDictionary::getBulk returns the number of strings not found
661  if (string_dict_->getBulk(strings, string_ids, generation_) == 0UL) {
662  return 0UL;
663  }
664 
665  // If here, dictionary could not find at least 1 target string,
666  // now look these up in the transient dictionary
667  // transientLookupBulk returns the number of strings not found
668  return transientLookupBulk(strings, string_ids, take_read_lock);
669 }
670 
671 template <typename String>
673  const std::vector<String>& lookup_strings,
674  int32_t* string_ids,
675  const bool take_read_lock) const {
676  const size_t num_strings = lookup_strings.size();
677  auto read_lock = take_read_lock ? std::shared_lock<std::shared_mutex>(rw_mutex_)
678  : std::shared_lock<std::shared_mutex>();
679 
680  if (num_strings == static_cast<size_t>(0) || transient_str_to_int_.empty()) {
681  return 0UL;
682  }
683  constexpr size_t tbb_parallel_threshold{20000};
684  if (num_strings < tbb_parallel_threshold) {
685  return transientLookupBulkUnlocked(lookup_strings, string_ids);
686  } else {
687  return transientLookupBulkParallelUnlocked(lookup_strings, string_ids);
688  }
689 }
690 
691 template <typename String>
693  const std::vector<String>& lookup_strings,
694  int32_t* string_ids) const {
695  const size_t num_strings = lookup_strings.size();
696  size_t num_strings_not_found = 0;
697  for (size_t string_idx = 0; string_idx < num_strings; ++string_idx) {
698  if (string_ids[string_idx] != StringDictionary::INVALID_STR_ID) {
699  continue;
700  }
701  // If we're here it means we need to look up this string as we don't
702  // have a valid id for it
703  string_ids[string_idx] = lookupTransientStringUnlocked(lookup_strings[string_idx]);
704  if (string_ids[string_idx] == StringDictionary::INVALID_STR_ID) {
705  num_strings_not_found++;
706  }
707  }
708  return num_strings_not_found;
709 }
710 
711 template <typename String>
713  const std::vector<String>& lookup_strings,
714  int32_t* string_ids) const {
715  const size_t num_lookup_strings = lookup_strings.size();
716  const size_t target_inputs_per_thread = 20000L;
717  ThreadInfo thread_info(
718  std::thread::hardware_concurrency(), num_lookup_strings, target_inputs_per_thread);
719  CHECK_GE(thread_info.num_threads, 1L);
720  CHECK_GE(thread_info.num_elems_per_thread, 1L);
721 
722  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
723 
724  tbb::task_arena limited_arena(thread_info.num_threads);
725  limited_arena.execute([&] {
727  tbb::blocked_range<size_t>(
728  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
729  [&](const tbb::blocked_range<size_t>& r) {
730  const size_t start_idx = r.begin();
731  const size_t end_idx = r.end();
732  size_t num_local_strings_not_found = 0;
733  for (size_t string_idx = start_idx; string_idx < end_idx; ++string_idx) {
734  if (string_ids[string_idx] != StringDictionary::INVALID_STR_ID) {
735  continue;
736  }
737  string_ids[string_idx] =
738  lookupTransientStringUnlocked(lookup_strings[string_idx]);
739  if (string_ids[string_idx] == StringDictionary::INVALID_STR_ID) {
740  num_local_strings_not_found++;
741  }
742  }
743  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
744  num_strings_not_found_per_thread[tbb_thread_idx] = num_local_strings_not_found;
745  },
746  tbb::simple_partitioner());
747  });
748  size_t num_strings_not_found = 0;
749  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
750  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
751  }
752  return num_strings_not_found;
753 }
754 
756  return string_dict_.get();
757 }
758 
759 int64_t StringDictionaryProxy::getGeneration() const noexcept {
760  return generation_;
761 }
762 
764  return string_dict_id_ == rhs.string_dict_id_ &&
766 }
767 
769  return !operator==(rhs);
770 }
void eachStringSerially(StringDictionary::StringCallback &) const
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::pair< const char *, size_t > getStringBytes(int32_t string_id) const noexcept
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape) const
size_t transientEntryCountUnlocked() const
StringLocalCallback(StringDictionaryProxy *sdp, StringDictionaryProxy::IdMap &id_map)
int64_t num_elems_per_thread
Definition: ThreadInfo.h:23
StringDictionaryProxy::IdMap & id_map_
heavyai::shared_lock< heavyai::shared_mutex > read_lock
size_t entryCount() const
Returns the number of total string entries for this proxy, both stored in the underlying dictionary a...
int32_t getIdOfStringNoGeneration(const std::string &str) const
std::ostream & operator<<(std::ostream &os, const SessionInfo &session_info)
Definition: SessionInfo.cpp:57
std::vector< int32_t > vector_map_
std::function< int32_t(std::string const &)> Lambda
std::string getStringUnlocked(const int32_t string_id) const
size_t storageEntryCount() const
Returns the number of string entries in the underlying string dictionary, at this proxy&#39;s generation_...
#define UNREACHABLE()
Definition: Logger.h:266
StringDictionary * getDictionary() const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:235
size_t transientLookupBulkUnlocked(const std::vector< String > &lookup_strings, int32_t *string_ids) const
StringDictionaryProxy * sdp_
void operator()(std::string const &str, int32_t const string_id) override
size_t transientLookupBulk(const std::vector< String > &lookup_strings, int32_t *string_ids, const bool take_read_lock) const
std::string getString(int32_t string_id) const
void setNumUntranslatedStrings(const size_t num_untranslated_strings)
Constants for Builtin SQL Types supported by HEAVY.AI.
heavyai::unique_lock< heavyai::shared_mutex > write_lock
IdMap buildIntersectionTranslationMapToOtherProxyUnlocked(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
size_t transientLookupBulkParallelUnlocked(const std::vector< String > &lookup_strings, int32_t *string_ids) const
#define CHECK_GT(x, y)
Definition: Logger.h:234
int32_t getIdOfStringFromClient(String const &) const
std::vector< int32_t > getTransientBulk(const std::vector< std::string > &strings) const
Executes read-only lookup of a vector of strings and returns a vector of their integer ids...
std::string to_string(char const *&&v)
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator) const
#define DEVICE
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
StringNetworkCallback(StringDictionaryProxy *sdp, StringDictionaryProxy::IdMap &id_map)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< StringDictionary > string_dict_
int64_t num_threads
Definition: ThreadInfo.h:22
IdMap transientUnion(StringDictionaryProxy const &)
std::vector< std::string const * > transient_string_vec_
void order_translation_locks(const int32_t source_db_id, const int32_t source_dict_id, const int32_t dest_db_id, const int32_t dest_dict_id, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:244
void operator()(std::string const &str, int32_t const old_id) override
int32_t lookupTransientStringUnlocked(const String &lookup_string) const
void setRangeEnd(const int32_t range_end)
std::vector< std::string > getStrings(const std::vector< int32_t > &string_ids) const
size_t getTransientBulkImpl(const std::vector< std::string > &strings, int32_t *string_ids, const bool take_read_lock) const
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:41
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
void operator()(std::string_view const sv, int32_t const old_id) override
static int32_t transientIndexToId(unsigned const index)
void updateGeneration(const int64_t generation) noexcept
size_t transientEntryCount() const
Returns the number of transient string entries for this proxy,.
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:296
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
StringDictionaryProxy(StringDictionaryProxy const &)=delete
#define RUNTIME_EXPORT
std::vector< int32_t > const & getVectorMap() const
#define CHECK_LT(x, y)
Definition: Logger.h:232
void operator()(std::string_view const, int32_t const string_id) override
bool do_compare(const std::string &str, const std::string &pattern, const std::string &comp_operator)
#define CHECK_LE(x, y)
Definition: Logger.h:233
StringDictionaryProxy * sdp_
int32_t getOrAddTransientUnlocked(String const &)
bool operator!=(StringDictionaryProxy const &) const
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape) const
int32_t getOrAdd(const std::string &str) noexcept
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
Definition: StringLike.cpp:57
bool operator==(StringDictionaryProxy const &) const
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
int32_t getDictId() const noexcept
std::vector< int32_t > getOrAddTransientBulk(const std::vector< std::string > &strings)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
#define CHECK(condition)
Definition: Logger.h:222
DEVICE RUNTIME_EXPORT int32_t StringDictionaryProxy_getStringId(int8_t *proxy_ptr, char *c_str_ptr)
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int32_t getOrAddTransient(const std::string &str)
void setRangeStart(const int32_t range_start)
DEVICE RUNTIME_EXPORT size_t StringDictionaryProxy_getStringLength(int8_t *proxy_ptr, int32_t string_id)
PrintContainer< CONTAINER > printContainer(CONTAINER &container)
Definition: misc.h:107
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: Regexp.cpp:39
int32_t getIdOfString(const std::string &str) const
static unsigned transientIdToIndex(int32_t const id)
int64_t getGeneration() const noexcept
#define VLOG(n)
Definition: Logger.h:316
int32_t truncate_to_generation(const int32_t id, const size_t generation)
DEVICE RUNTIME_EXPORT const char * StringDictionaryProxy_getStringBytes(int8_t *proxy_ptr, int32_t string_id)
StringDictionaryProxy::IdMap & id_map_
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
Definition: StringLike.cpp:255