OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseCapacity () noexcept
 
template<class String >
void increaseCapacityFromStorageAndMemory (const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) const noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
std::vector< int32_t > string_id_hash_table_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 42 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 111 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), anonymous_namespace{StringDictionary.cpp}::file_size(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, string_id_hash_table_, and logger::WARNING.

116  : str_count_(0)
117  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
118  , rk_hashes_(initial_capacity)
119  , isTemp_(isTemp)
120  , materialize_hashes_(materializeHashes)
121  , payload_fd_(-1)
122  , offset_fd_(-1)
123  , offset_map_(nullptr)
124  , payload_map_(nullptr)
125  , offset_file_size_(0)
126  , payload_file_size_(0)
127  , payload_file_off_(0)
128  , strings_cache_(nullptr) {
129  if (!isTemp && folder.empty()) {
130  return;
131  }
132 
133  // initial capacity must be a power of two for efficient bucket computation
134  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
135  if (!isTemp_) {
136  boost::filesystem::path storage_path(folder);
137  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
138  const auto payload_path =
139  (storage_path / boost::filesystem::path("DictPayload")).string();
140  payload_fd_ = checked_open(payload_path.c_str(), recover);
141  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
143  offset_file_size_ = file_size(offset_fd_);
144  }
145 
146  if (payload_file_size_ == 0) {
148  }
149  if (offset_file_size_ == 0) {
151  }
152  if (!isTemp_) { // we never mmap or recover temp dictionaries
153  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
154  offset_map_ =
155  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
156  if (recover) {
157  const size_t bytes = file_size(offset_fd_);
158  if (bytes % sizeof(StringIdxEntry) != 0) {
159  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
160  }
161  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
162  // at this point we know the size of the StringDict we need to load
163  // so lets reallocate the vector to the correct size
164  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
165  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
166  string_id_hash_table_.swap(new_str_ids);
167  if (materialize_hashes_) {
168  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
169  rk_hashes_.swap(new_rk_hashes);
170  }
171  unsigned string_id = 0;
172  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
173 
174  uint32_t thread_inits = 0;
175  const auto thread_count = std::thread::hardware_concurrency();
176  const uint32_t items_per_thread = std::max<uint32_t>(
177  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
178  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
179  dictionary_futures;
180  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
181  dictionary_futures.emplace_back(std::async(
182  std::launch::async, [string_id, str_count, items_per_thread, this] {
183  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
184  for (uint32_t curr_id = string_id;
185  curr_id < string_id + items_per_thread && curr_id < str_count;
186  curr_id++) {
187  const auto recovered = getStringFromStorage(curr_id);
188  if (recovered.canary) {
189  // hit the canary, recovery finished
190  break;
191  } else {
192  std::string temp(recovered.c_str_ptr, recovered.size);
193  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
194  }
195  }
196  return hashVec;
197  }));
198  thread_inits++;
199  if (thread_inits % thread_count == 0) {
200  processDictionaryFutures(dictionary_futures);
201  }
202  }
203  // gather last few threads
204  if (dictionary_futures.size() != 0) {
205  processDictionaryFutures(dictionary_futures);
206  }
207  }
208  }
209 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
std::vector< int32_t > string_id_hash_table_
void * checked_mmap(const int fd, const size_t sz)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 230 of file StringDictionary.cpp.

231  : strings_cache_(nullptr)
232  , client_(new StringDictionaryClient(host, dict_ref, true))
233  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 235 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

235  {
236  free(CANARY_BUFFER);
237  if (client_) {
238  return;
239  }
240  if (payload_map_) {
241  if (!isTemp_) {
245  CHECK_GE(payload_fd_, 0);
247  CHECK_GE(offset_fd_, 0);
248  close(offset_fd_);
249  } else {
251  free(payload_map_);
252  free(offset_map_);
253  }
254  }
255 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1302 of file StringDictionary.cpp.

References CHECK(), and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1304  {
1305  const size_t canary_buff_size_to_add =
1306  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1307  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1308  if (canary_buffer_size != canary_buff_size_to_add) {
1309  CANARY_BUFFER =
1310  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1311  canary_buffer_size = canary_buff_size_to_add;
1312  }
1314  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1315  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1316  CHECK(new_addr);
1317  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1318  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1319  mem_size += canary_buff_size_to_add;
1320  return new_addr;
1321 }
CHECK(cgen_state)

+ Here is the call graph for this function:

void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1272 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1272  {
1273  if (!isTemp_) {
1274  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1275  } else {
1276  offset_map_ = static_cast<StringIdxEntry*>(
1277  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1278  }
1279 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1263 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1263  {
1264  if (!isTemp_) {
1265  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1266  } else {
1267  payload_map_ = static_cast<char*>(
1268  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1269  }
1270 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1281 of file StringDictionary.cpp.

References CHECK(), CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1283  {
1284  const size_t canary_buff_size_to_add =
1285  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1286  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1287 
1288  if (canary_buffer_size != canary_buff_size_to_add) {
1289  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1290  canary_buffer_size = canary_buff_size_to_add;
1291  }
1293  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1294 
1295  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1296  ssize_t write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1297  CHECK(write_return > 0 &&
1298  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1299  return canary_buff_size_to_add;
1300 }
CHECK(cgen_state)
#define CHECK_NE(x, y)
Definition: Logger.h:206
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( String  str)
privatenoexcept

Definition at line 1208 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1208  {
1209  // write the payload
1211  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1212 
1213  // write the offset and length
1214  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1215  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1216 
1218  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1219 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1222 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1225  {
1226  const size_t num_strings = string_memory_ids.size();
1227 
1228  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1229  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1230 
1231  for (size_t i = 0; i < num_strings; ++i) {
1232  const size_t string_idx = string_memory_ids[i];
1233  const String str = input_strings[string_idx];
1234  const size_t str_size(str.size());
1235  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1236  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1237  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1238  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1239  }
1240 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1353 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1353  {
1354  // This method is not thread-safe.
1355  const auto cur_cache_size = sorted_cache.size();
1356  std::vector<int32_t> temp_sorted_cache;
1357  for (size_t i = cur_cache_size; i < str_count_; i++) {
1358  temp_sorted_cache.push_back(i);
1359  }
1360  sortCache(temp_sorted_cache);
1361  mergeSortedCache(temp_sorted_cache);
1362 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1187 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1188  {
1189  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1190  if (offset_file_off + write_length >= offset_file_size_) {
1191  const size_t min_capacity_needed =
1192  write_length - (offset_file_size_ - offset_file_off);
1193  if (!isTemp_) {
1194  CHECK_GE(offset_fd_, 0);
1196  addOffsetCapacity(min_capacity_needed);
1197  CHECK(offset_file_off + write_length <= offset_file_size_);
1198  offset_map_ =
1199  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
1200  } else {
1201  addOffsetCapacity(min_capacity_needed);
1202  CHECK(offset_file_off + write_length <= offset_file_size_);
1203  }
1204  }
1205 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
CHECK(cgen_state)
void * checked_mmap(const int fd, const size_t sz)

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1168 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1169  {
1170  if (payload_file_off_ + write_length > payload_file_size_) {
1171  const size_t min_capacity_needed =
1172  write_length - (payload_file_size_ - payload_file_off_);
1173  if (!isTemp_) {
1174  CHECK_GE(payload_fd_, 0);
1176  addPayloadCapacity(min_capacity_needed);
1177  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1178  payload_map_ =
1179  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
1180  } else {
1181  addPayloadCapacity(min_capacity_needed);
1182  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1183  }
1184  }
1185 }
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
CHECK(cgen_state)
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void * checked_mmap(const int fd, const size_t sz)

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1336 of file StringDictionary.cpp.

References CHECK(), client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1336  {
1337  if (client_) {
1338  try {
1339  return client_->checkpoint();
1340  } catch (...) {
1341  return false;
1342  }
1343  }
1344  CHECK(!isTemp_);
1345  bool ret = true;
1346  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1347  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1348  ret = ret && (fsync(offset_fd_) == 0);
1349  ret = ret && (fsync(payload_fd_) == 0);
1350  return ret;
1351 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const String &  str,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1072 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1075  {
1076  auto bucket = hash & (data.size() - 1);
1077  while (true) {
1078  const int32_t candidate_string_id = data[bucket];
1079  if (candidate_string_id ==
1080  INVALID_STR_ID) { // In this case it means the slot is available for use
1081  break;
1082  }
1083  if (!materialize_hashes_ ||
1084  (materialize_hashes_ && hash == rk_hashes_[candidate_string_id])) {
1085  const auto old_str = getStringFromStorageFast(candidate_string_id);
1086  if (str.size() == old_str.size() &&
1087  !memcmp(str.data(), old_str.data(), str.size())) {
1088  // found the string
1089  break;
1090  }
1091  }
1092  // wrap around
1093  if (++bucket == data.size()) {
1094  bucket = 0;
1095  }
1096  }
1097  return bucket;
1098 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const uint32_t  input_string_rk_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1101 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1107  {
1108  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1109  while (true) {
1110  const int32_t candidate_string_id = string_id_hash_table[bucket];
1111  if (candidate_string_id ==
1112  INVALID_STR_ID) { // In this case it means the slot is available for use
1113  break;
1114  }
1115  if (!materialize_hashes_ ||
1116  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1117  if (candidate_string_id > 0 &&
1118  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1119  // The candidate string is not in storage yet but in our string_memory_ids temp
1120  // buffer
1121  size_t memory_offset =
1122  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1123  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1124  if (input_string.size() == candidate_string.size() &&
1125  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1126  // found the string in the temp memory buffer
1127  break;
1128  }
1129  } else {
1130  // The candidate string is in storage, need to fetch it for comparison
1131  const auto candidate_storage_string =
1132  getStringFromStorageFast(candidate_string_id);
1133  if (input_string.size() == candidate_storage_string.size() &&
1134  !memcmp(input_string.data(),
1135  candidate_storage_string.data(),
1136  input_string.size())) {
1139  // found the string in storage
1140  break;
1141  }
1142  }
1143  }
1144  if (++bucket == string_id_hash_table.size()) {
1145  bucket = 0;
1146  }
1147  }
1148  return bucket;
1149 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1151 of file StringDictionary.cpp.

Referenced by increaseCapacity(), and processDictionaryFutures().

1153  {
1154  auto bucket = hash & (data.size() - 1);
1155  while (true) {
1156  if (data[bucket] ==
1157  INVALID_STR_ID) { // In this case it means the slot is available for use
1158  break;
1159  }
1160  // wrap around
1161  if (++bucket == data.size()) {
1162  bucket = 0;
1163  }
1164  }
1165  return bucket;
1166 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 905 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

905  {
906  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
907  if (client_) {
908  // TODO(miyu): support remote string dictionary
909  throw std::runtime_error(
910  "copying dictionaries from remote server is not supported yet.");
911  }
912 
913  if (strings_cache_) {
914  return strings_cache_;
915  }
916 
917  strings_cache_ = std::make_shared<std::vector<std::string>>();
918  strings_cache_->reserve(str_count_);
919  const bool multithreaded = str_count_ > 10000;
920  const auto worker_count =
921  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
922  CHECK_GT(worker_count, 0UL);
923  std::vector<std::vector<std::string>> worker_results(worker_count);
924  auto copy = [this](std::vector<std::string>& str_list,
925  const size_t start_id,
926  const size_t end_id) {
927  CHECK_LE(start_id, end_id);
928  str_list.reserve(end_id - start_id);
929  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
930  str_list.push_back(getStringUnlocked(string_id));
931  }
932  };
933  if (multithreaded) {
934  std::vector<std::future<void>> workers;
935  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
936  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
937  worker_idx < worker_count && start < str_count_;
938  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
939  workers.push_back(std::async(
940  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
941  }
942  for (auto& worker : workers) {
943  worker.get();
944  }
945  } else {
946  CHECK_EQ(worker_results.size(), size_t(1));
947  copy(worker_results[0], 0, str_count_);
948  }
949 
950  for (const auto& worker_result : worker_results) {
951  strings_cache_->insert(
952  strings_cache_->end(), worker_result.begin(), worker_result.end());
953  }
954  return strings_cache_;
955 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 957 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

957  {
958  return string_id_hash_table_.size() <= num_strings * 2;
959 }
std::vector< int32_t > string_id_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 697 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

699  {
700  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
701  if (client_) {
702  return client_->get_compare(pattern, comp_operator, generation);
703  }
704  std::vector<int32_t> ret;
705  if (str_count_ == 0) {
706  return ret;
707  }
708  if (sorted_cache.size() < str_count_) {
709  if (comp_operator == "=" || comp_operator == "<>") {
710  return getEquals(pattern, comp_operator, generation);
711  }
712 
714  }
715  auto cache_index = compare_cache_.get(pattern);
716 
717  if (!cache_index) {
718  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
719  const auto cache_itr = std::lower_bound(
720  sorted_cache.begin(),
721  sorted_cache.end(),
722  pattern,
723  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
724  auto a_str = this->getStringFromStorage(a);
725  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
726  });
727 
728  if (cache_itr == sorted_cache.end()) {
729  cache_index->index = sorted_cache.size() - 1;
730  cache_index->diff = 1;
731  } else {
732  const auto cache_str = getStringFromStorage(*cache_itr);
733  if (!string_eq(
734  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
735  cache_index->index = cache_itr - sorted_cache.begin() - 1;
736  cache_index->diff = 1;
737  } else {
738  cache_index->index = cache_itr - sorted_cache.begin();
739  cache_index->diff = 0;
740  }
741  }
742 
743  compare_cache_.put(pattern, cache_index);
744  }
745 
746  // since we have a cache in form of vector of ints which is sorted according to
747  // corresponding strings in the dictionary all we need is the index of the element
748  // which equal to the pattern that we are trying to match or the index of “biggest”
749  // element smaller than the pattern, to perform all the comparison operators over
750  // string. The search function guarantees we have such index so now it is just the
751  // matter to include all the elements in the result vector.
752 
753  // For < operator if the index that we have points to the element which is equal to
754  // the pattern that we are searching for we simply get all the elements less than the
755  // index. If the element pointed by the index is not equal to the pattern we are
756  // comparing with we also need to include that index in result vector, except when the
757  // index points to 0 and the pattern is lesser than the smallest value in the string
758  // dictionary.
759 
760  if (comp_operator == "<") {
761  size_t idx = cache_index->index;
762  if (cache_index->diff) {
763  idx = cache_index->index + 1;
764  if (cache_index->index == 0 && cache_index->diff > 0) {
765  idx = cache_index->index;
766  }
767  }
768  for (size_t i = 0; i < idx; i++) {
769  ret.push_back(sorted_cache[i]);
770  }
771 
772  // For <= operator if the index that we have points to the element which is equal to
773  // the pattern that we are searching for we want to include the element pointed by
774  // the index in the result set. If the element pointed by the index is not equal to
775  // the pattern we are comparing with we just want to include all the ids with index
776  // less than the index that is cached, except when pattern that we are searching for
777  // is smaller than the smallest string in the dictionary.
778 
779  } else if (comp_operator == "<=") {
780  size_t idx = cache_index->index + 1;
781  if (cache_index == 0 && cache_index->diff > 0) {
782  idx = cache_index->index;
783  }
784  for (size_t i = 0; i < idx; i++) {
785  ret.push_back(sorted_cache[i]);
786  }
787 
788  // For > operator we want to get all the elements with index greater than the index
789  // that we have except, when the pattern we are searching for is lesser than the
790  // smallest string in the dictionary we also want to include the id of the index
791  // that we have.
792 
793  } else if (comp_operator == ">") {
794  size_t idx = cache_index->index + 1;
795  if (cache_index->index == 0 && cache_index->diff > 0) {
796  idx = cache_index->index;
797  }
798  for (size_t i = idx; i < sorted_cache.size(); i++) {
799  ret.push_back(sorted_cache[i]);
800  }
801 
802  // For >= operator when the indexed element that we have points to element which is
803  // equal to the pattern we are searching for we want to include that in the result
804  // vector. If the index that we have does not point to the string which is equal to
805  // the pattern we are searching we don’t want to include that id into the result
806  // vector except when the index is 0.
807 
808  } else if (comp_operator == ">=") {
809  size_t idx = cache_index->index;
810  if (cache_index->diff) {
811  idx = cache_index->index + 1;
812  if (cache_index->index == 0 && cache_index->diff > 0) {
813  idx = cache_index->index;
814  }
815  }
816  for (size_t i = idx; i < sorted_cache.size(); i++) {
817  ret.push_back(sorted_cache[i]);
818  }
819  } else if (comp_operator == "=") {
820  if (!cache_index->diff) {
821  ret.push_back(sorted_cache[cache_index->index]);
822  }
823 
824  // For <> operator it is simple matter of not including id of string which is equal
825  // to pattern we are searching for.
826  } else if (comp_operator == "<>") {
827  if (!cache_index->diff) {
828  size_t idx = cache_index->index;
829  for (size_t i = 0; i < idx; i++) {
830  ret.push_back(sorted_cache[i]);
831  }
832  ++idx;
833  for (size_t i = idx; i < sorted_cache.size(); i++) {
834  ret.push_back(sorted_cache[i]);
835  }
836  } else {
837  for (size_t i = 0; i < sorted_cache.size(); i++) {
838  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
839  }
840  }
841 
842  } else {
843  std::runtime_error("Unsupported string comparison operator");
844  }
845  return ret;
846 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 637 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

639  {
640  std::vector<int32_t> result;
641  auto eq_id_itr = equal_cache_.find(pattern);
642  int32_t eq_id = MAX_STRLEN + 1;
643  int32_t cur_size = str_count_;
644  if (eq_id_itr != equal_cache_.end()) {
645  auto eq_id = eq_id_itr->second;
646  if (comp_operator == "=") {
647  result.push_back(eq_id);
648  } else {
649  for (int32_t idx = 0; idx <= cur_size; idx++) {
650  if (idx == eq_id) {
651  continue;
652  }
653  result.push_back(idx);
654  }
655  }
656  } else {
657  std::vector<std::thread> workers;
658  int worker_count = cpu_threads();
659  CHECK_GT(worker_count, 0);
660  std::vector<std::vector<int32_t>> worker_results(worker_count);
661  CHECK_LE(generation, str_count_);
662  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
663  workers.emplace_back(
664  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
665  for (size_t string_id = worker_idx; string_id < generation;
666  string_id += worker_count) {
667  const auto str = getStringUnlocked(string_id);
668  if (str == pattern) {
669  worker_results[worker_idx].push_back(string_id);
670  }
671  }
672  });
673  }
674  for (auto& worker : workers) {
675  worker.join();
676  }
677  for (const auto& worker_result : worker_results) {
678  result.insert(result.end(), worker_result.begin(), worker_result.end());
679  }
680  if (result.size() > 0) {
681  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
682  CHECK(it_ok.second);
683  eq_id = result[0];
684  }
685  if (comp_operator == "<>") {
686  for (int32_t idx = 0; idx <= cur_size; idx++) {
687  if (idx == eq_id) {
688  continue;
689  }
690  result.push_back(idx);
691  }
692  }
693  }
694  return result;
695 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:208
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 512 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

512  {
513  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
514  if (client_) {
515  return client_->get(str);
516  }
517  return getUnlocked(str);
518 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 584 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

588  {
589  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
590  if (client_) {
591  return client_->get_like(pattern, icase, is_simple, escape, generation);
592  }
593  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
594  const auto it = like_cache_.find(cache_key);
595  if (it != like_cache_.end()) {
596  return it->second;
597  }
598  std::vector<int32_t> result;
599  std::vector<std::thread> workers;
600  int worker_count = cpu_threads();
601  CHECK_GT(worker_count, 0);
602  std::vector<std::vector<int32_t>> worker_results(worker_count);
603  CHECK_LE(generation, str_count_);
604  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
605  workers.emplace_back([&worker_results,
606  &pattern,
607  generation,
608  icase,
609  is_simple,
610  escape,
611  worker_idx,
612  worker_count,
613  this]() {
614  for (size_t string_id = worker_idx; string_id < generation;
615  string_id += worker_count) {
616  const auto str = getStringUnlocked(string_id);
617  if (is_like(str, pattern, icase, is_simple, escape)) {
618  worker_results[worker_idx].push_back(string_id);
619  }
620  }
621  });
622  }
623  for (auto& worker : workers) {
624  worker.join();
625  }
626  for (const auto& worker_result : worker_results) {
627  result.insert(result.end(), worker_result.begin(), worker_result.end());
628  }
629  // place result into cache for reuse if similar query
630  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
631 
632  CHECK(it_ok.second);
633 
634  return result;
635 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 257 of file StringDictionary.cpp.

References CHECK_EQ.

257  {
258  if (client_) {
259  std::vector<int32_t> string_ids;
260  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
261  CHECK_EQ(size_t(1), string_ids.size());
262  return string_ids.front();
263  }
264  return getOrAddImpl(str);
265 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 317 of file StringDictionary.cpp.

References appendToStorage(), CHECK(), CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowCsvForeignStorage::createDictionaryEncodedColumn(), getOrAddBulkArray(), and populate_string_ids().

318  {
320  getOrAddBulkParallel(input_strings, output_string_ids);
321  return;
322  }
323  // Single-thread path.
324  if (client_no_timeout_) {
325  getOrAddBulkRemote(input_strings, output_string_ids);
326  return;
327  }
328  size_t out_idx{0};
329  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
330 
331  for (const auto& str : input_strings) {
332  if (str.empty()) {
333  output_string_ids[out_idx++] = inline_int_null_value<T>();
334  continue;
335  }
336  CHECK(str.size() <= MAX_STRLEN);
337  uint32_t bucket;
338  const uint32_t hash = rk_hash(str);
339  bucket = computeBucket(hash, str, string_id_hash_table_);
340  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
341  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
342  continue;
343  }
344  // need to add record to dictionary
345  // check there is room
346  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
347  log_encoding_error<T>(str);
348  output_string_ids[out_idx++] = inline_int_null_value<T>();
349  continue;
350  }
351  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
353  << "Maximum number (" << str_count_
354  << ") of Dictionary encoded Strings reached for this column, offset path "
355  "for column is "
356  << offsets_path_;
357  if (fillRateIsHigh(str_count_)) {
358  // resize when more than 50% is full
360  bucket = computeBucket(hash, str, string_id_hash_table_);
361  }
362  appendToStorage(str);
363 
364  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
365  if (materialize_hashes_) {
366  rk_hashes_[str_count_] = hash;
367  }
368  ++str_count_;
369  }
370  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
371  }
373 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 279 of file StringDictionary.cpp.

References getOrAddBulk().

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

281  {
282  ids_array_vec.resize(string_array_vec.size());
283  for (size_t i = 0; i < string_array_vec.size(); i++) {
284  auto& strings = string_array_vec[i];
285  auto& ids = ids_array_vec[i];
286  ids.resize(strings.size());
287  getOrAddBulk(strings, &ids[0]);
288  }
289 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 376 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK(), CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), getOrAddBulkRemote(), hashStrings(), increaseCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

377  {
378  if (client_no_timeout_) {
379  getOrAddBulkRemote(input_strings, output_string_ids);
380  return;
381  }
382  // Run rk_hash on the input strings up front, and in parallel,
383  // as the string hashing does not need to be behind the subsequent write_lock
384  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
385  hashStrings(input_strings, input_strings_rk_hashes);
386 
387  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
388  size_t shadow_str_count =
389  str_count_; // Need to shadow str_count_ now with bulk add methods
390  const size_t storage_high_water_mark = shadow_str_count;
391  std::vector<size_t> string_memory_ids;
392  size_t sum_new_string_lengths = 0;
393  string_memory_ids.reserve(input_strings.size());
394  size_t input_string_idx{0};
395  for (const auto& input_string : input_strings) {
396  // Currently we make empty strings null
397  if (input_string.empty()) {
398  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
399  continue;
400  }
401  // TODO: Recover gracefully if an input string is too long
402  CHECK(input_string.size() <= MAX_STRLEN);
403 
404  if (fillRateIsHigh(shadow_str_count)) {
405  // resize when more than 50% is full
406  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
407  input_strings,
408  string_memory_ids,
409  input_strings_rk_hashes);
410  }
411  // Get the rk_hash for this input_string
412  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
413 
414  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
415  input_string,
417  storage_high_water_mark,
418  input_strings,
419  string_memory_ids);
420 
421  // If the hash bucket is not empty, that is our string id
422  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
423  // bucket string are equal)
424  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
425  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
426  continue;
427  }
428  // Did not find string, so need to add record to dictionary
429  // First check there is room
430  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
431  log_encoding_error<T>(input_string);
432  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
433  continue;
434  }
435  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
436  << "Maximum number (" << shadow_str_count
437  << ") of Dictionary encoded Strings reached for this column, offset path "
438  "for column is "
439  << offsets_path_;
440 
441  string_memory_ids.push_back(input_string_idx);
442  sum_new_string_lengths += input_string.size();
443  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
444  if (materialize_hashes_) {
445  rk_hashes_[shadow_str_count] = input_string_rk_hash;
446  }
447  output_string_ids[input_string_idx++] = shadow_str_count++;
448  }
449  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
450  str_count_ = shadow_str_count;
451 
453 }
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 472 of file StringDictionary.cpp.

References CHECK(), and client_no_timeout_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

473  {
475  std::vector<int32_t> string_ids;
476  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
477  size_t out_idx{0};
478  for (size_t i = 0; i < string_ids.size(); ++i) {
479  const auto string_id = string_ids[i];
480  const bool invalid = string_id > max_valid_int_value<T>();
481  if (invalid || string_id == inline_int_null_value<int32_t>()) {
482  if (invalid) {
483  log_encoding_error<T>(string_vec[i]);
484  }
485  encoded_vec[out_idx++] = inline_int_null_value<T>();
486  continue;
487  }
488  encoded_vec[out_idx++] = string_id;
489  }
490 }
CHECK(cgen_state)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 1017 of file StringDictionary.cpp.

References CHECK(), CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

1017  {
1018  // @TODO(wei) treat empty string as NULL for now
1019  if (str.size() == 0) {
1020  return inline_int_null_value<int32_t>();
1021  }
1022  CHECK(str.size() <= MAX_STRLEN);
1023  uint32_t bucket;
1024  const uint32_t hash = rk_hash(str);
1025  {
1026  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1027  bucket = computeBucket(hash, str, string_id_hash_table_);
1028  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1029  return string_id_hash_table_[bucket];
1030  }
1031  }
1032  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1033  // need to recalculate the bucket in case it changed before
1034  // we got the lock
1035  bucket = computeBucket(hash, str, string_id_hash_table_);
1036  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1038  << "Maximum number (" << str_count_
1039  << ") of Dictionary encoded Strings reached for this column, offset path "
1040  "for column is "
1041  << offsets_path_;
1042  if (fillRateIsHigh(str_count_)) {
1043  // resize when more than 50% is full
1044  increaseCapacity();
1045  bucket = computeBucket(hash, str, string_id_hash_table_);
1046  }
1047  appendToStorage(str);
1048  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1049  if (materialize_hashes_) {
1050  rk_hashes_[str_count_] = hash;
1051  }
1052  ++str_count_;
1054  }
1055  return string_id_hash_table_[bucket];
1056 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 858 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

860  {
861  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
862  if (client_) {
863  return client_->get_regexp_like(pattern, escape, generation);
864  }
865  const auto cache_key = std::make_pair(pattern, escape);
866  const auto it = regex_cache_.find(cache_key);
867  if (it != regex_cache_.end()) {
868  return it->second;
869  }
870  std::vector<int32_t> result;
871  std::vector<std::thread> workers;
872  int worker_count = cpu_threads();
873  CHECK_GT(worker_count, 0);
874  std::vector<std::vector<int32_t>> worker_results(worker_count);
875  CHECK_LE(generation, str_count_);
876  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
877  workers.emplace_back([&worker_results,
878  &pattern,
879  generation,
880  escape,
881  worker_idx,
882  worker_count,
883  this]() {
884  for (size_t string_id = worker_idx; string_id < generation;
885  string_id += worker_count) {
886  const auto str = getStringUnlocked(string_id);
887  if (is_regexp_like(str, pattern, escape)) {
888  worker_results[worker_idx].push_back(string_id);
889  }
890  }
891  });
892  }
893  for (auto& worker : workers) {
894  worker.join();
895  }
896  for (const auto& worker_result : worker_results) {
897  result.insert(result.end(), worker_result.begin(), worker_result.end());
898  }
899  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
900  CHECK(it_ok.second);
901 
902  return result;
903 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 526 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

526  {
527  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
528  if (client_) {
529  std::string ret;
530  client_->get_string(ret, string_id);
531  return ret;
532  }
533  return getStringUnlocked(string_id);
534 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 541 of file StringDictionary.cpp.

References CHECK(), CHECK_LE, and CHECK_LT.

542  {
543  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
544  CHECK(!client_);
545  CHECK_LE(0, string_id);
546  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
547  return getStringBytesChecked(string_id);
548 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1064 of file StringDictionary.cpp.

References CHECK().

1065  {
1066  const auto str_canary = getStringFromStorage(string_id);
1067  CHECK(!str_canary.canary);
1068  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1069 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1058 of file StringDictionary.cpp.

References CHECK().

Referenced by increaseCapacity().

1058  {
1059  const auto str_canary = getStringFromStorage(string_id);
1060  CHECK(!str_canary.canary);
1061  return std::string(str_canary.c_str_ptr, str_canary.size);
1062 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1248 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1249  {
1250  if (!isTemp_) {
1251  CHECK_GE(payload_fd_, 0);
1252  CHECK_GE(offset_fd_, 0);
1253  }
1254  CHECK_GE(string_id, 0);
1255  const StringIdxEntry* str_meta = offset_map_ + string_id;
1256  if (str_meta->size == 0xffff) {
1257  // hit the canary
1258  return {nullptr, 0, true};
1259  }
1260  return {payload_map_ + str_meta->off, str_meta->size, false};
1261 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1242 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1243  {
1244  const StringIdxEntry* str_meta = offset_map_ + string_id;
1245  return {payload_map_ + str_meta->off, str_meta->size};
1246 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 536 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

536  {
537  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
538  return getStringChecked(string_id);
539 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 520 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getIdOfString().

520  {
521  const uint32_t hash = rk_hash(str);
522  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
523  return str_id;
524 }
uint32_t rk_hash(const std::string_view &str)
std::vector< int32_t > string_id_hash_table_
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< uint32_t > &  hashes 
) const
privatenoexcept

Method to rk_hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 301 of file StringDictionary.cpp.

References CHECK_EQ, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

302  {
303  CHECK_EQ(string_vec.size(), hashes.size());
304 
305  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
306  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
307  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
308  if (string_vec[curr_id].empty()) {
309  continue;
310  }
311  hashes[curr_id] = rk_hash(string_vec[curr_id]);
312  }
313  });
314 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 961 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

961  {
962  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
963 
964  if (materialize_hashes_) {
965  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
967  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
968  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
969  new_str_ids[bucket] = string_id_hash_table_[i];
970  }
971  }
972  rk_hashes_.resize(rk_hashes_.size() * 2);
973  } else {
974  for (size_t i = 0; i < str_count_; ++i) {
975  const auto str = getStringChecked(i);
976  const uint32_t hash = rk_hash(str);
977  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
978  new_str_ids[bucket] = i;
979  }
980  }
981  string_id_hash_table_.swap(new_str_ids);
982 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseCapacityFromStorageAndMemory ( const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< uint32_t > &  input_strings_rk_hashes 
)
privatenoexcept

Definition at line 985 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

989  {
990  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
991  if (materialize_hashes_) {
992  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
994  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
995  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
996  new_str_ids[bucket] = string_id_hash_table_[i];
997  }
998  }
999  rk_hashes_.resize(rk_hashes_.size() * 2);
1000  } else {
1001  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1002  const auto storage_string = getStringChecked(storage_idx);
1003  const uint32_t hash = rk_hash(storage_string);
1004  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1005  new_str_ids[bucket] = storage_idx;
1006  }
1007  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1008  size_t string_memory_id = string_memory_ids[memory_idx];
1009  uint32_t bucket = computeUniqueBucketWithHash(
1010  input_strings_rk_hashes[string_memory_id], new_str_ids);
1011  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1012  }
1013  }
1014  string_id_hash_table_.swap(new_str_ids);
1015 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1323 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1323  {
1324  if (!like_cache_.empty()) {
1325  decltype(like_cache_)().swap(like_cache_);
1326  }
1327  if (!regex_cache_.empty()) {
1328  decltype(regex_cache_)().swap(regex_cache_);
1329  }
1330  if (!equal_cache_.empty()) {
1331  decltype(equal_cache_)().swap(equal_cache_);
1332  }
1333  compare_cache_.invalidateInvertedIndex();
1334 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1377 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1377  {
1378  // this method is not thread safe
1379  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1380  size_t t_idx = 0, s_idx = 0, idx = 0;
1381  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1382  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1383  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1384  const auto insert_from_temp_cache =
1385  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1386  if (insert_from_temp_cache) {
1387  updated_cache[idx] = temp_sorted_cache[t_idx++];
1388  } else {
1389  updated_cache[idx] = sorted_cache[s_idx++];
1390  }
1391  }
1392  while (t_idx < temp_sorted_cache.size()) {
1393  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1394  }
1395  while (s_idx < sorted_cache.size()) {
1396  updated_cache[idx++] = sorted_cache[s_idx++];
1397  }
1398  sorted_cache.swap(updated_cache);
1399 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1428 of file StringDictionary.cpp.

References populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1432  {
1433  dest_array_ids.resize(source_array_ids.size());
1434 
1435  std::atomic<size_t> row_idx{0};
1436  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1437  int thread_id) {
1438  for (;;) {
1439  auto row = row_idx.fetch_add(1);
1440 
1441  if (row >= dest_array_ids.size()) {
1442  return;
1443  }
1444  const auto& source_ids = source_array_ids[row];
1445  auto& dest_ids = dest_array_ids[row];
1446  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1447  }
1448  };
1449 
1450  const int num_worker_threads = std::thread::hardware_concurrency();
1451 
1452  if (source_array_ids.size() / num_worker_threads > 10) {
1453  std::vector<std::future<void>> worker_threads;
1454  for (int i = 0; i < num_worker_threads; ++i) {
1455  worker_threads.push_back(std::async(std::launch::async, processor, i));
1456  }
1457 
1458  for (auto& child : worker_threads) {
1459  child.wait();
1460  }
1461  for (auto& child : worker_threads) {
1462  child.get();
1463  }
1464  } else {
1465  processor(0);
1466  }
1467 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:715

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1401 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1406  {
1407  std::vector<std::string> strings;
1408 
1409  for (const int32_t source_id : source_ids) {
1410  if (source_id == std::numeric_limits<int32_t>::min()) {
1411  strings.emplace_back("");
1412  } else if (source_id < 0) {
1413  if (auto string_itr = transient_mapping.find(source_id);
1414  string_itr != transient_mapping.end()) {
1415  strings.emplace_back(string_itr->second);
1416  } else {
1417  throw std::runtime_error("Unexpected negative source ID");
1418  }
1419  } else {
1420  strings.push_back(source_dict->getString(source_id));
1421  }
1422  }
1423 
1424  dest_ids.resize(strings.size());
1425  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1426 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 211 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by StringDictionary().

213  {
214  for (auto& dictionary_future : dictionary_futures) {
215  dictionary_future.wait();
216  auto hashVec = dictionary_future.get();
217  for (auto& hash : hashVec) {
218  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
219  payload_file_off_ += hash.second;
220  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
221  if (materialize_hashes_) {
222  rk_hashes_[str_count_] = hash.first;
223  }
224  ++str_count_;
225  }
226  }
227  dictionary_futures.clear();
228 }
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1364 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1364  {
1365  // This method is not thread-safe.
1366 
1367  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1368  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1369 
1370  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1371  auto a_str = this->getStringFromStorage(a);
1372  auto b_str = this->getStringFromStorage(b);
1373  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1374  });
1375 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 550 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

Referenced by ArrowCsvForeignStorage::createDictionaryEncodedColumn().

550  {
551  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
552  if (client_) {
553  return client_->storage_entry_count();
554  }
555  return str_count_;
556 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the caller graph for this function:

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 224 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 225 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 222 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 219 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 218 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 216 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 115 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 206 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 217 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<uint32_t> StringDictionary::rk_hashes_
private
std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 203 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 220 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: