19 #include <tbb/parallel_for.h>
20 #include <boost/filesystem/operations.hpp>
21 #include <boost/filesystem/path.hpp>
22 #include <boost/sort/spreadsort/string_sort.hpp>
25 #include <string_view>
32 #include <sys/fcntl.h>
52 auto fd =
omnisci::open(path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644);
56 auto err = std::string(
"Dictionary path ") + std::string(path) +
57 std::string(
" does not exist.");
83 for (
size_t i = 0;
i < str.size(); ++
i) {
84 str_hash = str_hash * 997 + str[
i];
98 const bool materializeHashes,
99 size_t initial_capacity)
102 , string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
103 , hash_cache_(initial_capacity)
105 , materialize_hashes_(materializeHashes)
108 , offset_map_(nullptr)
109 , payload_map_(nullptr)
110 , offset_file_size_(0)
111 , payload_file_size_(0)
112 , payload_file_off_(0)
113 , strings_cache_(nullptr) {
114 if (!isTemp && folder.empty()) {
119 CHECK_EQ(
size_t(0), (initial_capacity & (initial_capacity - 1)));
121 boost::filesystem::path storage_path(folder);
122 offsets_path_ = (storage_path / boost::filesystem::path(
"DictOffsets")).
string();
123 const auto payload_path =
124 (storage_path / boost::filesystem::path(
"DictPayload")).
string();
130 bool storage_is_empty =
false;
132 storage_is_empty =
true;
148 const uint64_t str_count =
153 const uint64_t max_entries =
155 round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
159 std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
164 if (str_count == 0) {
168 unsigned string_id = 0;
171 uint32_t thread_inits = 0;
172 const auto thread_count = std::thread::hardware_concurrency();
173 const uint32_t items_per_thread = std::max<uint32_t>(
174 2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
175 std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
177 for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
178 dictionary_futures.emplace_back(std::async(
179 std::launch::async, [string_id, str_count, items_per_thread,
this] {
180 std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
181 for (uint32_t curr_id = string_id;
182 curr_id < string_id + items_per_thread && curr_id < str_count;
185 if (recovered.canary) {
189 std::string_view temp(recovered.c_str_ptr, recovered.size);
190 hashVec.emplace_back(std::make_pair(
hash_string(temp), temp.size()));
196 if (thread_inits % thread_count == 0) {
201 if (dictionary_futures.size() != 0) {
204 VLOG(1) <<
"Opened string dictionary " << folder <<
" # Strings: " <<
str_count_
215 std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>&
216 dictionary_futures) {
217 for (
auto& dictionary_future : dictionary_futures) {
218 dictionary_future.wait();
219 const auto hashVec = dictionary_future.get();
220 for (
const auto& hash : hashVec) {
221 const uint32_t bucket =
231 dictionary_futures.clear();
243 if (storage_slots == 0) {
248 int64_t min_bound = 0;
249 int64_t max_bound = storage_slots - 1;
251 while (min_bound <= max_bound) {
252 guess = (max_bound + min_bound) / 2;
254 if (getStringFromStorage(guess).canary) {
255 max_bound = guess - 1;
257 min_bound = guess + 1;
260 CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
261 return guess + (min_bound > guess ? 1 : 0);
265 : folder_(
"DB_" + std::
to_string(dict_ref.dbId) +
"_DICT_" +
267 , strings_cache_(nullptr)
295 std::vector<int32_t> string_ids;
296 client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
297 CHECK_EQ(
size_t(1), string_ids.size());
298 return string_ids.front();
300 return getOrAddImpl(str);
307 std::ostringstream oss;
308 oss <<
"The text encoded column stored at " << folder <<
", has exceeded its limit of "
309 <<
sizeof(
T) * 8 <<
" bits (" << static_cast<size_t>(max_valid_int_value<T>() + 1)
310 <<
" unique values)."
311 <<
" There was an attempt to add the new string '" << str
312 <<
"'. Table will need to be recreated with larger String Dictionary Capacity";
314 throw std::runtime_error(oss.str());
319 template <
class String>
321 const std::vector<std::vector<String>>& string_array_vec,
322 std::vector<std::vector<int32_t>>& ids_array_vec) {
323 ids_array_vec.resize(string_array_vec.size());
324 for (
size_t i = 0;
i < string_array_vec.size();
i++) {
325 auto& strings = string_array_vec[
i];
326 auto& ids = ids_array_vec[
i];
327 ids.resize(strings.size());
333 const std::vector<std::vector<std::string>>& string_array_vec,
334 std::vector<std::vector<int32_t>>& ids_array_vec);
341 template <
class String>
343 std::vector<string_dict_hash_t>& hashes)
const
345 CHECK_EQ(string_vec.size(), hashes.size());
347 tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
348 [&string_vec, &hashes](
const tbb::blocked_range<size_t>&
r) {
349 for (
size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
350 if (string_vec[curr_id].empty()) {
353 hashes[curr_id] =
hash_string(string_vec[curr_id]);
358 template <
class T,
class String>
360 T* output_string_ids) {
374 for (
const auto& input_string : input_strings) {
375 if (input_string.empty()) {
376 output_string_ids[idx++] = inline_int_null_value<T>();
382 uint32_t hash_bucket =
390 if (
str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
391 throw_encoding_error<T>(input_string,
folder_);
395 <<
") of Dictionary encoded Strings reached for this column, offset path "
409 const int32_t string_id =
static_cast<int32_t
>(
str_count_);
411 output_string_ids[idx++] = string_id;
414 const size_t num_strings_added =
str_count_ - initial_str_count;
415 if (num_strings_added > 0) {
420 template <
class T,
class String>
422 T* output_string_ids) {
429 std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
433 size_t shadow_str_count =
435 const size_t storage_high_water_mark = shadow_str_count;
436 std::vector<size_t> string_memory_ids;
437 size_t sum_new_string_lengths = 0;
438 string_memory_ids.reserve(input_strings.size());
439 size_t input_string_idx{0};
440 for (
const auto& input_string : input_strings) {
442 if (input_string.empty()) {
443 output_string_ids[input_string_idx++] = inline_int_null_value<T>();
452 storage_high_water_mark,
455 input_strings_hashes);
460 const uint32_t hash_bucket =
464 storage_high_water_mark,
472 output_string_ids[input_string_idx++] =
478 if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
479 throw_encoding_error<T>(input_string,
folder_);
482 <<
"Maximum number (" << shadow_str_count
483 <<
") of Dictionary encoded Strings reached for this column, offset path "
487 string_memory_ids.push_back(input_string_idx);
488 sum_new_string_lengths += input_string.size();
490 static_cast<int32_t
>(shadow_str_count);
494 output_string_ids[input_string_idx++] = shadow_str_count++;
497 const size_t num_strings_added = shadow_str_count -
str_count_;
498 str_count_ = shadow_str_count;
499 if (num_strings_added > 0) {
504 uint8_t* encoded_vec);
506 uint16_t* encoded_vec);
508 int32_t* encoded_vec);
511 const std::vector<std::string_view>& string_vec,
512 uint8_t* encoded_vec);
514 const std::vector<std::string_view>& string_vec,
515 uint16_t* encoded_vec);
517 const std::vector<std::string_view>& string_vec,
518 int32_t* encoded_vec);
520 template <
class T,
class String>
524 std::vector<int32_t> string_ids;
527 for (
size_t i = 0;
i < string_ids.size(); ++
i) {
528 const auto string_id = string_ids[
i];
529 const bool invalid = string_id > max_valid_int_value<T>();
530 if (invalid || string_id == inline_int_null_value<int32_t>()) {
532 throw_encoding_error<T>(string_vec[
i],
folder_);
534 encoded_vec[out_idx++] = inline_int_null_value<T>();
537 encoded_vec[out_idx++] = string_id;
542 const std::vector<std::string>& string_vec,
543 uint8_t* encoded_vec);
545 const std::vector<std::string>& string_vec,
546 uint16_t* encoded_vec);
548 const std::vector<std::string>& string_vec,
549 int32_t* encoded_vec);
552 const std::vector<std::string_view>& string_vec,
553 uint8_t* encoded_vec);
555 const std::vector<std::string_view>& string_vec,
556 uint16_t* encoded_vec);
558 const std::vector<std::string_view>& string_vec,
559 int32_t* encoded_vec);
571 auto str_id = string_id_string_dict_hash_table_[computeBucket(
572 hash, str, string_id_string_dict_hash_table_)];
580 client_->get_string(ret, string_id);
587 CHECK_LT(string_id, static_cast<int32_t>(str_count_));
588 return getStringChecked(string_id);
593 mapd_shared_lock<mapd_shared_mutex>
read_lock(rw_mutex_);
596 CHECK_LT(string_id, static_cast<int32_t>(str_count_));
597 return getStringBytesChecked(string_id);
603 return client_->storage_entry_count();
611 const std::string& pattern,
613 const bool is_simple,
617 str.c_str(), str.size(), pattern.c_str(), pattern.size())
624 str.c_str(), str.size(), pattern.c_str(), pattern.size())
636 const bool is_simple,
638 const size_t generation)
const {
641 return client_->get_like(pattern, icase, is_simple, escape, generation);
643 const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
648 std::vector<int32_t>
result;
649 std::vector<std::thread> workers;
652 std::vector<std::vector<int32_t>> worker_results(worker_count);
654 for (
int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
655 workers.emplace_back([&worker_results,
664 for (
size_t string_id = worker_idx; string_id < generation;
665 string_id += worker_count) {
667 if (
is_like(str, pattern, icase, is_simple, escape)) {
668 worker_results[worker_idx].push_back(string_id);
673 for (
auto& worker : workers) {
676 for (
const auto& worker_result : worker_results) {
677 result.insert(result.end(), worker_result.begin(), worker_result.end());
680 const auto it_ok =
like_cache_.insert(std::make_pair(cache_key, result));
688 std::string comp_operator,
690 std::vector<int32_t>
result;
695 auto eq_id = eq_id_itr->second;
696 if (comp_operator ==
"=") {
697 result.push_back(eq_id);
699 for (int32_t idx = 0; idx <= cur_size; idx++) {
703 result.push_back(idx);
707 std::vector<std::thread> workers;
710 std::vector<std::vector<int32_t>> worker_results(worker_count);
712 for (
int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
713 workers.emplace_back(
714 [&worker_results, &pattern, generation, worker_idx, worker_count,
this]() {
715 for (
size_t string_id = worker_idx; string_id < generation;
716 string_id += worker_count) {
718 if (str == pattern) {
719 worker_results[worker_idx].push_back(string_id);
724 for (
auto& worker : workers) {
727 for (
const auto& worker_result : worker_results) {
728 result.insert(result.end(), worker_result.begin(), worker_result.end());
730 if (result.size() > 0) {
731 const auto it_ok =
equal_cache_.insert(std::make_pair(pattern, result[0]));
735 if (comp_operator ==
"<>") {
736 for (int32_t idx = 0; idx <= cur_size; idx++) {
740 result.push_back(idx);
748 const std::string& comp_operator,
749 const size_t generation) {
752 return client_->get_compare(pattern, comp_operator, generation);
754 std::vector<int32_t> ret;
759 if (comp_operator ==
"=" || comp_operator ==
"<>") {
760 return getEquals(pattern, comp_operator, generation);
768 cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
773 [
this](decltype(
sorted_cache)::value_type
const& a, decltype(pattern)& b) {
775 return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
780 cache_index->diff = 1;
784 cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
785 cache_index->index = cache_itr -
sorted_cache.begin() - 1;
786 cache_index->diff = 1;
789 cache_index->diff = 0;
810 if (comp_operator ==
"<") {
811 size_t idx = cache_index->index;
812 if (cache_index->diff) {
813 idx = cache_index->index + 1;
814 if (cache_index->index == 0 && cache_index->diff > 0) {
815 idx = cache_index->index;
818 for (
size_t i = 0;
i < idx;
i++) {
829 }
else if (comp_operator ==
"<=") {
830 size_t idx = cache_index->index + 1;
831 if (cache_index == 0 && cache_index->diff > 0) {
832 idx = cache_index->index;
834 for (
size_t i = 0;
i < idx;
i++) {
843 }
else if (comp_operator ==
">") {
844 size_t idx = cache_index->index + 1;
845 if (cache_index->index == 0 && cache_index->diff > 0) {
846 idx = cache_index->index;
858 }
else if (comp_operator ==
">=") {
859 size_t idx = cache_index->index;
860 if (cache_index->diff) {
861 idx = cache_index->index + 1;
862 if (cache_index->index == 0 && cache_index->diff > 0) {
863 idx = cache_index->index;
869 }
else if (comp_operator ==
"=") {
870 if (!cache_index->diff) {
876 }
else if (comp_operator ==
"<>") {
877 if (!cache_index->diff) {
878 size_t idx = cache_index->index;
879 for (
size_t i = 0;
i < idx;
i++) {
893 std::runtime_error(
"Unsupported string comparison operator");
901 const std::string& pattern,
903 return regexp_like(str.c_str(), str.size(), pattern.c_str(), pattern.size(), escape);
910 const size_t generation)
const {
913 return client_->get_regexp_like(pattern, escape, generation);
915 const auto cache_key = std::make_pair(pattern, escape);
920 std::vector<int32_t>
result;
921 std::vector<std::thread> workers;
924 std::vector<std::vector<int32_t>> worker_results(worker_count);
926 for (
int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
927 workers.emplace_back([&worker_results,
934 for (
size_t string_id = worker_idx; string_id < generation;
935 string_id += worker_count) {
938 worker_results[worker_idx].push_back(string_id);
943 for (
auto& worker : workers) {
946 for (
const auto& worker_result : worker_results) {
947 result.insert(result.end(), worker_result.begin(), worker_result.end());
949 const auto it_ok =
regex_cache_.insert(std::make_pair(cache_key, result));
959 throw std::runtime_error(
960 "copying dictionaries from remote server is not supported yet.");
969 const bool multithreaded =
str_count_ > 10000;
970 const auto worker_count =
971 multithreaded ?
static_cast<size_t>(
cpu_threads()) :
size_t(1);
973 std::vector<std::vector<std::string>> worker_results(worker_count);
974 auto copy = [
this](std::vector<std::string>& str_list,
975 const size_t start_id,
976 const size_t end_id) {
978 str_list.reserve(end_id - start_id);
979 for (
size_t string_id = start_id; string_id < end_id; ++string_id) {
984 std::vector<std::future<void>> workers;
985 const auto stride = (
str_count_ + (worker_count - 1)) / worker_count;
986 for (
size_t worker_idx = 0, start = 0, end = std::min(start + stride,
str_count_);
987 worker_idx < worker_count && start <
str_count_;
988 ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
989 workers.push_back(std::async(
990 std::launch::async,
copy, std::ref(worker_results[worker_idx]), start, end));
992 for (
auto& worker : workers) {
996 CHECK_EQ(worker_results.size(), size_t(1));
1000 for (
const auto& worker_result : worker_results) {
1002 strings_cache_->end(), worker_result.begin(), worker_result.end());
1008 return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1019 new_str_ids[bucket] =
i;
1027 new_str_ids[bucket] =
i;
1033 template <
class String>
1035 const size_t str_count,
1037 const size_t storage_high_water_mark,
1038 const std::vector<String>& input_strings,
1039 const std::vector<size_t>& string_memory_ids,
1040 const std::vector<string_dict_hash_t>& input_strings_hashes) noexcept {
1041 std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1043 if (materialize_hashes_) {
1044 for (
size_t i = 0;
i != str_count; ++
i) {
1046 const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1047 new_str_ids[bucket] =
i;
1049 hash_cache_.resize(hash_cache_.size() * 2);
1051 for (
size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1052 const auto storage_string = getStringChecked(storage_idx);
1054 const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1055 new_str_ids[bucket] = storage_idx;
1057 for (
size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1058 const size_t string_memory_id = string_memory_ids[memory_idx];
1059 const uint32_t bucket = computeUniqueBucketWithHash(
1060 input_strings_hashes[string_memory_id], new_str_ids);
1061 new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1064 string_id_string_dict_hash_table_.swap(new_str_ids);
1069 if (str.size() == 0) {
1070 return inline_int_null_value<int32_t>();
1072 CHECK(str.size() <= MAX_STRLEN);
1075 mapd_shared_lock<mapd_shared_mutex>
read_lock(rw_mutex_);
1076 const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1077 if (string_id_string_dict_hash_table_[bucket] != INVALID_STR_ID) {
1078 return string_id_string_dict_hash_table_[bucket];
1081 mapd_lock_guard<mapd_shared_mutex>
write_lock(rw_mutex_);
1082 if (fillRateIsHigh(str_count_)) {
1084 increaseHashTableCapacity();
1088 const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1089 if (string_id_string_dict_hash_table_[bucket] == INVALID_STR_ID) {
1091 <<
"Maximum number (" << str_count_
1092 <<
") of Dictionary encoded Strings reached for this column, offset path "
1095 appendToStorage(str);
1096 string_id_string_dict_hash_table_[bucket] =
static_cast<int32_t
>(str_count_);
1097 if (materialize_hashes_) {
1098 hash_cache_[str_count_] = hash;
1101 invalidateInvertedIndex();
1103 return string_id_string_dict_hash_table_[bucket];
1107 const auto str_canary = getStringFromStorage(string_id);
1108 CHECK(!str_canary.canary);
1109 return std::string(str_canary.c_str_ptr, str_canary.size);
1113 const int string_id)
const noexcept {
1114 const auto str_canary = getStringFromStorage(string_id);
1115 CHECK(!str_canary.canary);
1116 return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1119 template <
class String>
1122 const String& input_string,
1123 const std::vector<int32_t>& string_id_string_dict_hash_table)
const noexcept {
1124 const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1125 uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1127 const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1128 if (candidate_string_id ==
1132 if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1133 !materialize_hashes_) {
1134 const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1135 if (input_string.size() == candidate_string.size() &&
1136 !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1142 if (++bucket == string_dict_hash_table_size) {
1149 template <
class String>
1152 const String& input_string,
1153 const std::vector<int32_t>& string_id_string_dict_hash_table,
1154 const size_t storage_high_water_mark,
1155 const std::vector<String>& input_strings,
1156 const std::vector<size_t>& string_memory_ids)
const noexcept {
1157 uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1159 const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1160 if (candidate_string_id ==
1164 if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1165 if (candidate_string_id > 0 &&
1166 static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1169 size_t memory_offset =
1170 static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1171 const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1172 if (input_string.size() == candidate_string.size() &&
1173 !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1179 const auto candidate_storage_string =
1180 getStringFromStorageFast(candidate_string_id);
1181 if (input_string.size() == candidate_storage_string.size() &&
1182 !memcmp(input_string.data(),
1183 candidate_storage_string.data(),
1184 input_string.size())) {
1192 if (++bucket == string_id_string_dict_hash_table.size()) {
1201 const std::vector<int32_t>& string_id_string_dict_hash_table) noexcept {
1202 const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1203 uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1205 if (string_id_string_dict_hash_table[bucket] ==
1211 if (++bucket == string_dict_hash_table_size) {
1219 const size_t write_length) {
1221 const size_t min_capacity_needed =
1238 const size_t write_length) {
1241 const size_t min_capacity_needed =
1257 template <
class String>
1260 checkAndConditionallyIncreasePayloadCapacity(str.size());
1261 memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1265 payload_file_off_ += str.size();
1267 checkAndConditionallyIncreaseOffsetCapacity(
sizeof(str_meta));
1268 memcpy(offset_map_ + str_count_, &str_meta,
sizeof(str_meta));
1271 template <
class String>
1273 const std::vector<String>& input_strings,
1274 const std::vector<size_t>& string_memory_ids,
1275 const size_t sum_new_strings_lengths) noexcept {
1276 const size_t num_strings = string_memory_ids.size();
1278 checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1279 checkAndConditionallyIncreaseOffsetCapacity(
sizeof(
StringIdxEntry) * num_strings);
1281 for (
size_t i = 0;
i < num_strings; ++
i) {
1282 const size_t string_idx = string_memory_ids[
i];
1283 const String str = input_strings[string_idx];
1284 const size_t str_size(str.size());
1285 memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1286 StringIdxEntry str_meta{
static_cast<uint64_t
>(payload_file_off_), str_size};
1287 payload_file_off_ += str_size;
1288 memcpy(offset_map_ + str_count_ +
i, &str_meta,
sizeof(str_meta));
1295 return {payload_map_ + str_meta->
off, str_meta->
size};
1299 const int string_id)
const noexcept {
1306 if (str_meta->
size == 0xffff) {
1308 return {
nullptr, 0,
true};
1310 return {payload_map_ + str_meta->
off, str_meta->
size,
false};
1315 payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1317 payload_map_ =
static_cast<char*
>(
1318 addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1324 offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1327 addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1333 const size_t min_capacity_requested) noexcept {
1334 const size_t canary_buff_size_to_add =
1338 if (canary_buffer_size < canary_buff_size_to_add) {
1339 CANARY_BUFFER =
static_cast<char*
>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1340 canary_buffer_size = canary_buff_size_to_add;
1341 CHECK(CANARY_BUFFER);
1342 memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1345 CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1346 const auto write_return =
write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1347 CHECK(write_return > 0 &&
1348 (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1349 return canary_buff_size_to_add;
1354 const size_t min_capacity_requested) noexcept {
1355 const size_t canary_buff_size_to_add =
1358 if (canary_buffer_size < canary_buff_size_to_add) {
1360 reinterpret_cast<char*
>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1361 canary_buffer_size = canary_buff_size_to_add;
1362 CHECK(CANARY_BUFFER);
1363 memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1365 void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1367 void* write_addr =
reinterpret_cast<void*
>(
static_cast<char*
>(new_addr) + mem_size);
1368 CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1369 mem_size += canary_buff_size_to_add;
1412 std::vector<int32_t> temp_sorted_cache;
1414 temp_sorted_cache.push_back(
i);
1426 std::sort(cache.begin(), cache.end(), [
this](int32_t a, int32_t b) {
1429 return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1435 std::vector<int32_t> updated_cache(temp_sorted_cache.size() +
sorted_cache.size());
1436 size_t t_idx = 0, s_idx = 0, idx = 0;
1437 for (; t_idx < temp_sorted_cache.size() && s_idx <
sorted_cache.size(); idx++) {
1440 const auto insert_from_temp_cache =
1441 string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1442 if (insert_from_temp_cache) {
1443 updated_cache[idx] = temp_sorted_cache[t_idx++];
1448 while (t_idx < temp_sorted_cache.size()) {
1449 updated_cache[idx++] = temp_sorted_cache[t_idx++];
1458 std::vector<int32_t>& dest_ids,
1460 const std::vector<int32_t>& source_ids,
1462 const std::map<int32_t, std::string> transient_mapping) {
1463 std::vector<std::string> strings;
1465 for (
const int32_t source_id : source_ids) {
1466 if (source_id == std::numeric_limits<int32_t>::min()) {
1467 strings.emplace_back(
"");
1468 }
else if (source_id < 0) {
1469 if (
auto string_itr = transient_mapping.find(source_id);
1470 string_itr != transient_mapping.end()) {
1471 strings.emplace_back(string_itr->second);
1473 throw std::runtime_error(
"Unexpected negative source ID");
1476 strings.push_back(source_dict->
getString(source_id));
1480 dest_ids.resize(strings.size());
1485 std::vector<std::vector<int32_t>>& dest_array_ids,
1487 const std::vector<std::vector<int32_t>>& source_array_ids,
1489 dest_array_ids.resize(source_array_ids.size());
1491 std::atomic<size_t> row_idx{0};
1492 auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1495 auto row = row_idx.fetch_add(1);
1497 if (row >= dest_array_ids.size()) {
1500 const auto& source_ids = source_array_ids[row];
1501 auto& dest_ids = dest_array_ids[row];
1506 const int num_worker_threads = std::thread::hardware_concurrency();
1508 if (source_array_ids.size() / num_worker_threads > 10) {
1509 std::vector<std::future<void>> worker_threads;
1510 for (
int i = 0;
i < num_worker_threads; ++
i) {
1511 worker_threads.push_back(std::async(std::launch::async, processor,
i));
1514 for (
auto& child : worker_threads) {
1517 for (
auto& child : worker_threads) {
1528 const std::vector<int32_t>& source_ids,
1529 const DictRef source_dict_ref,
1530 const int32_t dest_generation) {
1531 DictRef temp_dict_ref(-1, -1);
1534 dest_ids, dest_dict_ref, source_ids, source_dict_ref, dest_generation);
StringIdxEntry * offset_map_
void translate_string_ids(std::vector< int32_t > &dest_ids, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
size_t payload_file_size_
void increaseHashTableCapacity() noexcept
int open(const char *path, int flags, int mode)
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void * checked_mmap(const int fd, const size_t sz)
string_dict_hash_t hash_string(const std::string_view &str)
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
std::vector< int32_t > getRegexpLike(const std::string &pattern, const char escape, const size_t generation) const
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
size_t storageEntryCount() const
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE void sort(ARGS &&...args)
bool fillRateIsHigh(const size_t num_strings) const noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
Constants for Builtin SQL Types supported by OmniSci.
std::string offsets_path_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
int32_t getIdOfString(const std::string &str) const
int32_t getOrAdd(const std::string &str) noexcept
int32_t getUnlocked(const std::string &str) const noexcept
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
StringDictionary(const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
static constexpr size_t MAX_STRCOUNT
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
std::vector< int32_t > getCompare(const std::string &pattern, const std::string &comp_operator, const size_t generation)
DEVICE auto copy(ARGS &&...args)
const std::string folder_
mapd_shared_mutex rw_mutex_
RUNTIME_EXPORT DEVICE bool string_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
RUNTIME_EXPORT DEVICE bool string_like_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
std::map< std::string, int32_t > equal_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
void getOrAddBulkArray(const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
Functions to support the LIKE and ILIKE operator in SQL. Only single-byte character set is supported ...
void appendToStorage(const String str) noexcept
int checked_open(const char *path, const bool recover)
bool g_enable_smem_group_by true
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::pair< char *, size_t > getStringBytes(int32_t string_id) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
const int SYSTEM_PAGE_SIZE
bool checkpoint() noexcept
std::vector< int32_t > getLike(const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
DEVICE auto lower_bound(ARGS &&...args)
RUNTIME_EXPORT DEVICE bool string_ilike_simple(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
int msync(void *addr, size_t length, bool async)
void checked_munmap(void *addr, size_t length)
std::string getString(int32_t string_id) const
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
mapd_shared_lock< mapd_shared_mutex > read_lock
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
static void populate_string_array_ids(std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
bool g_enable_watchdog false
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
void sortCache(std::vector< int32_t > &cache)
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
bool g_enable_stringdict_parallel
void throw_encoding_error(std::string_view str, std::string_view folder)
PayloadString getStringFromStorage(const int string_id) const noexcept
RUNTIME_EXPORT DEVICE bool regexp_like(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
DEVICE void swap(ARGS &&...args)
std::vector< int32_t > sorted_cache
int32_t getOrAddImpl(const std::string_view &str) noexcept
~StringDictionary() noexcept
RUNTIME_EXPORT DEVICE bool string_ilike(const char *str, const int32_t str_len, const char *pattern, const int32_t pat_len, const char escape_char)
std::shared_ptr< const std::vector< std::string > > copyStrings() const
size_t file_size(const int fd)