OmniSciDB  8a228a1076
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const std::vector< std::string > > copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseCapacity () noexcept
 
template<class String >
void increaseCapacityFromStorageAndMemory (const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_hash_table_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
 
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string, compare_cache_value_tcompare_cache_
 
std::shared_ptr< std::vector< std::string > > strings_cache_
 
std::unique_ptr< StringDictionaryClientclient_
 
std::unique_ptr< StringDictionaryClientclient_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 42 of file StringDictionary.h.

Constructor & Destructor Documentation

◆ StringDictionary() [1/2]

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 111 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, anonymous_namespace{StringDictionary.cpp}::file_size(), getNumStringsFromStorage(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_hash_table_, VLOG, and logger::WARNING.

116  : str_count_(0)
117  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
118  , rk_hashes_(initial_capacity)
119  , isTemp_(isTemp)
120  , materialize_hashes_(materializeHashes)
121  , payload_fd_(-1)
122  , offset_fd_(-1)
123  , offset_map_(nullptr)
124  , payload_map_(nullptr)
125  , offset_file_size_(0)
126  , payload_file_size_(0)
127  , payload_file_off_(0)
128  , strings_cache_(nullptr) {
129  if (!isTemp && folder.empty()) {
130  return;
131  }
132 
133  // initial capacity must be a power of two for efficient bucket computation
134  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
135  if (!isTemp_) {
136  boost::filesystem::path storage_path(folder);
137  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
138  const auto payload_path =
139  (storage_path / boost::filesystem::path("DictPayload")).string();
140  payload_fd_ = checked_open(payload_path.c_str(), recover);
141  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
143  offset_file_size_ = file_size(offset_fd_);
144  }
145  bool storage_is_empty = false;
146  if (payload_file_size_ == 0) {
147  storage_is_empty = true;
149  }
150  if (offset_file_size_ == 0) {
152  }
153  if (!isTemp_) { // we never mmap or recover temp dictionaries
154  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
155  offset_map_ =
156  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
157  if (recover) {
158  const size_t bytes = file_size(offset_fd_);
159  if (bytes % sizeof(StringIdxEntry) != 0) {
160  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
161  }
162  const uint64_t str_count =
163  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
164  collisions_ = 0;
165  // at this point we know the size of the StringDict we need to load
166  // so lets reallocate the vector to the correct size
167  const uint64_t max_entries =
168  std::max(round_up_p2(str_count * 2 + 1),
169  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
170  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
171  string_id_hash_table_.swap(new_str_ids);
172  if (materialize_hashes_) {
173  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
174  rk_hashes_.swap(new_rk_hashes);
175  }
176  // Bail early if we know we don't have strings to add (i.e. a new or empty
177  // dictionary)
178  if (str_count == 0) {
179  return;
180  }
181 
182  unsigned string_id = 0;
183  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
184 
185  uint32_t thread_inits = 0;
186  const auto thread_count = std::thread::hardware_concurrency();
187  const uint32_t items_per_thread = std::max<uint32_t>(
188  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
189  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
190  dictionary_futures;
191  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
192  dictionary_futures.emplace_back(std::async(
193  std::launch::async, [string_id, str_count, items_per_thread, this] {
194  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
195  for (uint32_t curr_id = string_id;
196  curr_id < string_id + items_per_thread && curr_id < str_count;
197  curr_id++) {
198  const auto recovered = getStringFromStorage(curr_id);
199  if (recovered.canary) {
200  // hit the canary, recovery finished
201  break;
202  } else {
203  std::string temp(recovered.c_str_ptr, recovered.size);
204  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
205  }
206  }
207  return hashVec;
208  }));
209  thread_inits++;
210  if (thread_inits % thread_count == 0) {
211  processDictionaryFutures(dictionary_futures);
212  }
213  }
214  // gather last few threads
215  if (dictionary_futures.size() != 0) {
216  processDictionaryFutures(dictionary_futures);
217  }
218  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
219  << " Hash table size: " << string_id_hash_table_.size() << " Fill rate: "
220  << static_cast<double>(str_count_) * 100.0 / string_id_hash_table_.size()
221  << "% Collisions: " << collisions_;
222  }
223  }
224 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
std::vector< int32_t > string_id_hash_table_
void * checked_mmap(const int fd, const size_t sz)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
#define VLOG(n)
Definition: Logger.h:291
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:

◆ StringDictionary() [2/2]

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 275 of file StringDictionary.cpp.

276  : strings_cache_(nullptr)
277  , client_(new StringDictionaryClient(host, dict_ref, true))
278  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_

◆ ~StringDictionary()

StringDictionary::~StringDictionary ( )
noexcept

Definition at line 280 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

280  {
281  free(CANARY_BUFFER);
282  if (client_) {
283  return;
284  }
285  if (payload_map_) {
286  if (!isTemp_) {
290  CHECK_GE(payload_fd_, 0);
292  CHECK_GE(offset_fd_, 0);
293  close(offset_fd_);
294  } else {
296  free(payload_map_);
297  free(offset_map_);
298  }
299  }
300 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:197
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:106
+ Here is the call graph for this function:

Member Function Documentation

◆ addMemoryCapacity()

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1348 of file StringDictionary.cpp.

References CANARY_BUFFER, canary_buffer_size, CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1350  {
1351  const size_t canary_buff_size_to_add =
1352  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1353  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1354  if (canary_buffer_size != canary_buff_size_to_add) {
1355  CANARY_BUFFER =
1356  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1357  canary_buffer_size = canary_buff_size_to_add;
1358  }
1360  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1361  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1362  CHECK(new_addr);
1363  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1364  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1365  mem_size += canary_buff_size_to_add;
1366  return new_addr;
1367 }
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ addOffsetCapacity()

void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1318 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, offset_fd_, offset_file_size_, and offset_map_.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1318  {
1319  if (!isTemp_) {
1320  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1321  } else {
1322  offset_map_ = static_cast<StringIdxEntry*>(
1323  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1324  }
1325 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addPayloadCapacity()

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1309 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1309  {
1310  if (!isTemp_) {
1311  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1312  } else {
1313  payload_map_ = static_cast<char*>(
1314  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1315  }
1316 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addStorageCapacity()

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1327 of file StringDictionary.cpp.

References CANARY_BUFFER, canary_buffer_size, CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1329  {
1330  const size_t canary_buff_size_to_add =
1331  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1332  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1333 
1334  if (canary_buffer_size != canary_buff_size_to_add) {
1335  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1336  canary_buffer_size = canary_buff_size_to_add;
1337  }
1339  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1340 
1341  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1342  ssize_t write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1343  CHECK(write_return > 0 &&
1344  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1345  return canary_buff_size_to_add;
1346 }
#define CHECK_NE(x, y)
Definition: Logger.h:206
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:125
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ appendToStorage()

template<class String >
void StringDictionary::appendToStorage ( String  str)
privatenoexcept

Definition at line 1254 of file StringDictionary.cpp.

References checkAndConditionallyIncreaseOffsetCapacity(), checkAndConditionallyIncreasePayloadCapacity(), offset_map_, payload_file_off_, payload_map_, StringDictionary::StringIdxEntry::size, and str_count_.

Referenced by getOrAddBulk(), and getOrAddImpl().

1254  {
1255  // write the payload
1257  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1258 
1259  // write the offset and length
1260  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1261  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1262 
1264  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1265 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ appendToStorageBulk()

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1268 of file StringDictionary.cpp.

References checkAndConditionallyIncreaseOffsetCapacity(), checkAndConditionallyIncreasePayloadCapacity(), offset_map_, payload_file_off_, payload_map_, and str_count_.

Referenced by getOrAddBulkParallel().

1271  {
1272  const size_t num_strings = string_memory_ids.size();
1273 
1274  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1275  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1276 
1277  for (size_t i = 0; i < num_strings; ++i) {
1278  const size_t string_idx = string_memory_ids[i];
1279  const String str = input_strings[string_idx];
1280  const size_t str_size(str.size());
1281  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1282  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1283  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1284  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1285  }
1286 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ binary_search_cache()

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private

◆ buildSortedCache()

void StringDictionary::buildSortedCache ( )
private

Definition at line 1399 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1399  {
1400  // This method is not thread-safe.
1401  const auto cur_cache_size = sorted_cache.size();
1402  std::vector<int32_t> temp_sorted_cache;
1403  for (size_t i = cur_cache_size; i < str_count_; i++) {
1404  temp_sorted_cache.push_back(i);
1405  }
1406  sortCache(temp_sorted_cache);
1407  mergeSortedCache(temp_sorted_cache);
1408 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ checkAndConditionallyIncreaseOffsetCapacity()

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1233 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

Referenced by appendToStorage(), and appendToStorageBulk().

1234  {
1235  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1236  if (offset_file_off + write_length >= offset_file_size_) {
1237  const size_t min_capacity_needed =
1238  write_length - (offset_file_size_ - offset_file_off);
1239  if (!isTemp_) {
1240  CHECK_GE(offset_fd_, 0);
1242  addOffsetCapacity(min_capacity_needed);
1243  CHECK(offset_file_off + write_length <= offset_file_size_);
1244  offset_map_ =
1245  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
1246  } else {
1247  addOffsetCapacity(min_capacity_needed);
1248  CHECK(offset_file_off + write_length <= offset_file_size_);
1249  }
1250  }
1251 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
void * checked_mmap(const int fd, const size_t sz)
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ checkAndConditionallyIncreasePayloadCapacity()

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1214 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

Referenced by appendToStorage(), and appendToStorageBulk().

1215  {
1216  if (payload_file_off_ + write_length > payload_file_size_) {
1217  const size_t min_capacity_needed =
1218  write_length - (payload_file_size_ - payload_file_off_);
1219  if (!isTemp_) {
1220  CHECK_GE(payload_fd_, 0);
1222  addPayloadCapacity(min_capacity_needed);
1223  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1224  payload_map_ =
1225  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
1226  } else {
1227  addPayloadCapacity(min_capacity_needed);
1228  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1229  }
1230  }
1231 }
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void * checked_mmap(const int fd, const size_t sz)
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ checkpoint()

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1382 of file StringDictionary.cpp.

References CHECK, client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

1382  {
1383  if (client_) {
1384  try {
1385  return client_->checkpoint();
1386  } catch (...) {
1387  return false;
1388  }
1389  }
1390  CHECK(!isTemp_);
1391  bool ret = true;
1392  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1393  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1394  ret = ret && (fsync(offset_fd_) == 0);
1395  ret = ret && (fsync(payload_fd_) == 0);
1396  return ret;
1397 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:197

◆ computeBucket()

template<class String >
uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const String &  str,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1117 of file StringDictionary.cpp.

References getStringFromStorageFast(), INVALID_STR_ID, materialize_hashes_, and rk_hashes_.

Referenced by getOrAddBulk(), getOrAddImpl(), and getUnlocked().

1120  {
1121  auto bucket = hash & (data.size() - 1);
1122  while (true) {
1123  const int32_t candidate_string_id = data[bucket];
1124  if (candidate_string_id ==
1125  INVALID_STR_ID) { // In this case it means the slot is available for use
1126  break;
1127  }
1128  if (!materialize_hashes_ ||
1129  (materialize_hashes_ && hash == rk_hashes_[candidate_string_id])) {
1130  const auto old_str = getStringFromStorageFast(candidate_string_id);
1131  if (str.size() == old_str.size() &&
1132  !memcmp(str.data(), old_str.data(), str.size())) {
1133  // found the string
1134  break;
1135  }
1136  }
1137  // wrap around
1138  if (++bucket == data.size()) {
1139  bucket = 0;
1140  }
1141  }
1142  return bucket;
1143 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ computeBucketFromStorageAndMemory()

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const uint32_t  input_string_rk_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1146 of file StringDictionary.cpp.

References getStringFromStorageFast(), INVALID_STR_ID, materialize_hashes_, and rk_hashes_.

Referenced by getOrAddBulkParallel().

1152  {
1153  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1154  while (true) {
1155  const int32_t candidate_string_id = string_id_hash_table[bucket];
1156  if (candidate_string_id ==
1157  INVALID_STR_ID) { // In this case it means the slot is available for use
1158  break;
1159  }
1160  if (!materialize_hashes_ ||
1161  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1162  if (candidate_string_id > 0 &&
1163  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1164  // The candidate string is not in storage yet but in our string_memory_ids temp
1165  // buffer
1166  size_t memory_offset =
1167  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1168  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1169  if (input_string.size() == candidate_string.size() &&
1170  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1171  // found the string in the temp memory buffer
1172  break;
1173  }
1174  } else {
1175  // The candidate string is in storage, need to fetch it for comparison
1176  const auto candidate_storage_string =
1177  getStringFromStorageFast(candidate_string_id);
1178  if (input_string.size() == candidate_storage_string.size() &&
1179  !memcmp(input_string.data(),
1180  candidate_storage_string.data(),
1181  input_string.size())) {
1184  // found the string in storage
1185  break;
1186  }
1187  }
1188  }
1189  if (++bucket == string_id_hash_table.size()) {
1190  bucket = 0;
1191  }
1192  }
1193  return bucket;
1194 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ computeUniqueBucketWithHash()

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
)
privatenoexcept

Definition at line 1196 of file StringDictionary.cpp.

References collisions_, and INVALID_STR_ID.

Referenced by increaseCapacity(), increaseCapacityFromStorageAndMemory(), and processDictionaryFutures().

1198  {
1199  auto bucket = hash & (data.size() - 1);
1200  while (true) {
1201  if (data[bucket] ==
1202  INVALID_STR_ID) { // In this case it means the slot is available for use
1203  break;
1204  }
1205  collisions_++;
1206  // wrap around
1207  if (++bucket == data.size()) {
1208  bucket = 0;
1209  }
1210  }
1211  return bucket;
1212 }
static constexpr int32_t INVALID_STR_ID
+ Here is the caller graph for this function:

◆ copyStrings()

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 950 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

950  {
951  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
952  if (client_) {
953  // TODO(miyu): support remote string dictionary
954  throw std::runtime_error(
955  "copying dictionaries from remote server is not supported yet.");
956  }
957 
958  if (strings_cache_) {
959  return strings_cache_;
960  }
961 
962  strings_cache_ = std::make_shared<std::vector<std::string>>();
963  strings_cache_->reserve(str_count_);
964  const bool multithreaded = str_count_ > 10000;
965  const auto worker_count =
966  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
967  CHECK_GT(worker_count, 0UL);
968  std::vector<std::vector<std::string>> worker_results(worker_count);
969  auto copy = [this](std::vector<std::string>& str_list,
970  const size_t start_id,
971  const size_t end_id) {
972  CHECK_LE(start_id, end_id);
973  str_list.reserve(end_id - start_id);
974  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
975  str_list.push_back(getStringUnlocked(string_id));
976  }
977  };
978  if (multithreaded) {
979  std::vector<std::future<void>> workers;
980  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
981  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
982  worker_idx < worker_count && start < str_count_;
983  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
984  workers.push_back(std::async(
985  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
986  }
987  for (auto& worker : workers) {
988  worker.get();
989  }
990  } else {
991  CHECK_EQ(worker_results.size(), size_t(1));
992  copy(worker_results[0], 0, str_count_);
993  }
994 
995  for (const auto& worker_result : worker_results) {
996  strings_cache_->insert(
997  strings_cache_->end(), worker_result.begin(), worker_result.end());
998  }
999  return strings_cache_;
1000 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:

◆ fillRateIsHigh()

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1002 of file StringDictionary.cpp.

References string_id_hash_table_.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddImpl().

1002  {
1003  return string_id_hash_table_.size() <= num_strings * 2;
1004 }
std::vector< int32_t > string_id_hash_table_
+ Here is the caller graph for this function:

◆ getCompare()

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 742 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

744  {
745  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
746  if (client_) {
747  return client_->get_compare(pattern, comp_operator, generation);
748  }
749  std::vector<int32_t> ret;
750  if (str_count_ == 0) {
751  return ret;
752  }
753  if (sorted_cache.size() < str_count_) {
754  if (comp_operator == "=" || comp_operator == "<>") {
755  return getEquals(pattern, comp_operator, generation);
756  }
757 
759  }
760  auto cache_index = compare_cache_.get(pattern);
761 
762  if (!cache_index) {
763  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
764  const auto cache_itr = std::lower_bound(
765  sorted_cache.begin(),
766  sorted_cache.end(),
767  pattern,
768  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
769  auto a_str = this->getStringFromStorage(a);
770  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
771  });
772 
773  if (cache_itr == sorted_cache.end()) {
774  cache_index->index = sorted_cache.size() - 1;
775  cache_index->diff = 1;
776  } else {
777  const auto cache_str = getStringFromStorage(*cache_itr);
778  if (!string_eq(
779  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
780  cache_index->index = cache_itr - sorted_cache.begin() - 1;
781  cache_index->diff = 1;
782  } else {
783  cache_index->index = cache_itr - sorted_cache.begin();
784  cache_index->diff = 0;
785  }
786  }
787 
788  compare_cache_.put(pattern, cache_index);
789  }
790 
791  // since we have a cache in form of vector of ints which is sorted according to
792  // corresponding strings in the dictionary all we need is the index of the element
793  // which equal to the pattern that we are trying to match or the index of “biggest”
794  // element smaller than the pattern, to perform all the comparison operators over
795  // string. The search function guarantees we have such index so now it is just the
796  // matter to include all the elements in the result vector.
797 
798  // For < operator if the index that we have points to the element which is equal to
799  // the pattern that we are searching for we simply get all the elements less than the
800  // index. If the element pointed by the index is not equal to the pattern we are
801  // comparing with we also need to include that index in result vector, except when the
802  // index points to 0 and the pattern is lesser than the smallest value in the string
803  // dictionary.
804 
805  if (comp_operator == "<") {
806  size_t idx = cache_index->index;
807  if (cache_index->diff) {
808  idx = cache_index->index + 1;
809  if (cache_index->index == 0 && cache_index->diff > 0) {
810  idx = cache_index->index;
811  }
812  }
813  for (size_t i = 0; i < idx; i++) {
814  ret.push_back(sorted_cache[i]);
815  }
816 
817  // For <= operator if the index that we have points to the element which is equal to
818  // the pattern that we are searching for we want to include the element pointed by
819  // the index in the result set. If the element pointed by the index is not equal to
820  // the pattern we are comparing with we just want to include all the ids with index
821  // less than the index that is cached, except when pattern that we are searching for
822  // is smaller than the smallest string in the dictionary.
823 
824  } else if (comp_operator == "<=") {
825  size_t idx = cache_index->index + 1;
826  if (cache_index == 0 && cache_index->diff > 0) {
827  idx = cache_index->index;
828  }
829  for (size_t i = 0; i < idx; i++) {
830  ret.push_back(sorted_cache[i]);
831  }
832 
833  // For > operator we want to get all the elements with index greater than the index
834  // that we have except, when the pattern we are searching for is lesser than the
835  // smallest string in the dictionary we also want to include the id of the index
836  // that we have.
837 
838  } else if (comp_operator == ">") {
839  size_t idx = cache_index->index + 1;
840  if (cache_index->index == 0 && cache_index->diff > 0) {
841  idx = cache_index->index;
842  }
843  for (size_t i = idx; i < sorted_cache.size(); i++) {
844  ret.push_back(sorted_cache[i]);
845  }
846 
847  // For >= operator when the indexed element that we have points to element which is
848  // equal to the pattern we are searching for we want to include that in the result
849  // vector. If the index that we have does not point to the string which is equal to
850  // the pattern we are searching we don’t want to include that id into the result
851  // vector except when the index is 0.
852 
853  } else if (comp_operator == ">=") {
854  size_t idx = cache_index->index;
855  if (cache_index->diff) {
856  idx = cache_index->index + 1;
857  if (cache_index->index == 0 && cache_index->diff > 0) {
858  idx = cache_index->index;
859  }
860  }
861  for (size_t i = idx; i < sorted_cache.size(); i++) {
862  ret.push_back(sorted_cache[i]);
863  }
864  } else if (comp_operator == "=") {
865  if (!cache_index->diff) {
866  ret.push_back(sorted_cache[cache_index->index]);
867  }
868 
869  // For <> operator it is simple matter of not including id of string which is equal
870  // to pattern we are searching for.
871  } else if (comp_operator == "<>") {
872  if (!cache_index->diff) {
873  size_t idx = cache_index->index;
874  for (size_t i = 0; i < idx; i++) {
875  ret.push_back(sorted_cache[i]);
876  }
877  ++idx;
878  for (size_t i = idx; i < sorted_cache.size(); i++) {
879  ret.push_back(sorted_cache[i]);
880  }
881  } else {
882  for (size_t i = 0; i < sorted_cache.size(); i++) {
883  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
884  }
885  }
886 
887  } else {
888  std::runtime_error("Unsupported string comparison operator");
889  }
890  return ret;
891 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:

◆ getEquals()

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 682 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

684  {
685  std::vector<int32_t> result;
686  auto eq_id_itr = equal_cache_.find(pattern);
687  int32_t eq_id = MAX_STRLEN + 1;
688  int32_t cur_size = str_count_;
689  if (eq_id_itr != equal_cache_.end()) {
690  auto eq_id = eq_id_itr->second;
691  if (comp_operator == "=") {
692  result.push_back(eq_id);
693  } else {
694  for (int32_t idx = 0; idx <= cur_size; idx++) {
695  if (idx == eq_id) {
696  continue;
697  }
698  result.push_back(idx);
699  }
700  }
701  } else {
702  std::vector<std::thread> workers;
703  int worker_count = cpu_threads();
704  CHECK_GT(worker_count, 0);
705  std::vector<std::vector<int32_t>> worker_results(worker_count);
706  CHECK_LE(generation, str_count_);
707  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
708  workers.emplace_back(
709  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
710  for (size_t string_id = worker_idx; string_id < generation;
711  string_id += worker_count) {
712  const auto str = getStringUnlocked(string_id);
713  if (str == pattern) {
714  worker_results[worker_idx].push_back(string_id);
715  }
716  }
717  });
718  }
719  for (auto& worker : workers) {
720  worker.join();
721  }
722  for (const auto& worker_result : worker_results) {
723  result.insert(result.end(), worker_result.begin(), worker_result.end());
724  }
725  if (result.size() > 0) {
726  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
727  CHECK(it_ok.second);
728  eq_id = result[0];
729  }
730  if (comp_operator == "<>") {
731  for (int32_t idx = 0; idx <= cur_size; idx++) {
732  if (idx == eq_id) {
733  continue;
734  }
735  result.push_back(idx);
736  }
737  }
738  }
739  return result;
740 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getIdOfString()

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 557 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

557  {
558  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
559  if (client_) {
560  return client_->get(str);
561  }
562  return getUnlocked(str);
563 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock
+ Here is the call graph for this function:

◆ getLike()

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 629 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

633  {
634  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
635  if (client_) {
636  return client_->get_like(pattern, icase, is_simple, escape, generation);
637  }
638  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
639  const auto it = like_cache_.find(cache_key);
640  if (it != like_cache_.end()) {
641  return it->second;
642  }
643  std::vector<int32_t> result;
644  std::vector<std::thread> workers;
645  int worker_count = cpu_threads();
646  CHECK_GT(worker_count, 0);
647  std::vector<std::vector<int32_t>> worker_results(worker_count);
648  CHECK_LE(generation, str_count_);
649  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
650  workers.emplace_back([&worker_results,
651  &pattern,
652  generation,
653  icase,
654  is_simple,
655  escape,
656  worker_idx,
657  worker_count,
658  this]() {
659  for (size_t string_id = worker_idx; string_id < generation;
660  string_id += worker_count) {
661  const auto str = getStringUnlocked(string_id);
662  if (is_like(str, pattern, icase, is_simple, escape)) {
663  worker_results[worker_idx].push_back(string_id);
664  }
665  }
666  });
667  }
668  for (auto& worker : workers) {
669  worker.join();
670  }
671  for (const auto& worker_result : worker_results) {
672  result.insert(result.end(), worker_result.begin(), worker_result.end());
673  }
674  // place result into cache for reuse if similar query
675  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
676 
677  CHECK(it_ok.second);
678 
679  return result;
680 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:

◆ getNumStringsFromStorage()

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 252 of file StringDictionary.cpp.

References CHECK_GE, and getStringFromStorage().

Referenced by StringDictionary().

253  {
254  if (storage_slots == 0) {
255  return 0;
256  }
257  // Must use signed integers since final binary search step can wrap to max size_t value
258  // if dictionary is empty
259  int64_t min_bound = 0;
260  int64_t max_bound = storage_slots - 1;
261  int64_t guess{0};
262  while (min_bound <= max_bound) {
263  guess = (max_bound + min_bound) / 2;
264  CHECK_GE(guess, 0);
265  if (getStringFromStorage(guess).canary) {
266  max_bound = guess - 1;
267  } else {
268  min_bound = guess + 1;
269  }
270  }
271  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
272  return guess + (min_bound > guess ? 1 : 0);
273 }
#define CHECK_GE(x, y)
Definition: Logger.h:210
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAdd()

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 302 of file StringDictionary.cpp.

References CHECK_EQ, client_, and getOrAddImpl().

302  {
303  if (client_) {
304  std::vector<int32_t> string_ids;
305  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
306  CHECK_EQ(size_t(1), string_ids.size());
307  return string_ids.front();
308  }
309  return getOrAddImpl(str);
310 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
+ Here is the call graph for this function:

◆ getOrAddBulk()

template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 362 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by foreign_storage::ParquetStringEncoder< V >::appendData(), ArrowCsvForeignStorage::createDictionaryEncodedColumn(), getOrAddBulkArray(), getOrAddBulkParallel(), and populate_string_ids().

363  {
365  getOrAddBulkParallel(input_strings, output_string_ids);
366  return;
367  }
368  // Single-thread path.
369  if (client_no_timeout_) {
370  getOrAddBulkRemote(input_strings, output_string_ids);
371  return;
372  }
373  size_t out_idx{0};
374  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
375 
376  for (const auto& str : input_strings) {
377  if (str.empty()) {
378  output_string_ids[out_idx++] = inline_int_null_value<T>();
379  continue;
380  }
381  CHECK(str.size() <= MAX_STRLEN);
382  uint32_t bucket;
383  const uint32_t hash = rk_hash(str);
384  bucket = computeBucket(hash, str, string_id_hash_table_);
385  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
386  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
387  continue;
388  }
389  // need to add record to dictionary
390  // check there is room
391  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
392  log_encoding_error<T>(str);
393  output_string_ids[out_idx++] = inline_int_null_value<T>();
394  continue;
395  }
396  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
398  << "Maximum number (" << str_count_
399  << ") of Dictionary encoded Strings reached for this column, offset path "
400  "for column is "
401  << offsets_path_;
402  if (fillRateIsHigh(str_count_)) {
403  // resize when more than 50% is full
405  bucket = computeBucket(hash, str, string_id_hash_table_);
406  }
407  appendToStorage(str);
408 
409  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
410  if (materialize_hashes_) {
411  rk_hashes_[str_count_] = hash;
412  }
413  ++str_count_;
414  }
415  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
416  }
418 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAddBulkArray()

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 324 of file StringDictionary.cpp.

References getOrAddBulk().

326  {
327  ids_array_vec.resize(string_array_vec.size());
328  for (size_t i = 0; i < string_array_vec.size(); i++) {
329  auto& strings = string_array_vec[i];
330  auto& ids = ids_array_vec[i];
331  ids.resize(strings.size());
332  getOrAddBulk(strings, &ids[0]);
333  }
334 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:

◆ getOrAddBulkParallel()

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 421 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), getOrAddBulk(), getOrAddBulkRemote(), hashStrings(), increaseCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

422  {
423  if (client_no_timeout_) {
424  getOrAddBulkRemote(input_strings, output_string_ids);
425  return;
426  }
427  // Run rk_hash on the input strings up front, and in parallel,
428  // as the string hashing does not need to be behind the subsequent write_lock
429  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
430  hashStrings(input_strings, input_strings_rk_hashes);
431 
432  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
433  size_t shadow_str_count =
434  str_count_; // Need to shadow str_count_ now with bulk add methods
435  const size_t storage_high_water_mark = shadow_str_count;
436  std::vector<size_t> string_memory_ids;
437  size_t sum_new_string_lengths = 0;
438  string_memory_ids.reserve(input_strings.size());
439  size_t input_string_idx{0};
440  for (const auto& input_string : input_strings) {
441  // Currently we make empty strings null
442  if (input_string.empty()) {
443  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
444  continue;
445  }
446  // TODO: Recover gracefully if an input string is too long
447  CHECK(input_string.size() <= MAX_STRLEN);
448 
449  if (fillRateIsHigh(shadow_str_count)) {
450  // resize when more than 50% is full
451  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
452  input_strings,
453  string_memory_ids,
454  input_strings_rk_hashes);
455  }
456  // Get the rk_hash for this input_string
457  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
458 
459  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
460  input_string,
462  storage_high_water_mark,
463  input_strings,
464  string_memory_ids);
465 
466  // If the hash bucket is not empty, that is our string id
467  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
468  // bucket string are equal)
469  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
470  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
471  continue;
472  }
473  // Did not find string, so need to add record to dictionary
474  // First check there is room
475  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
476  log_encoding_error<T>(input_string);
477  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
478  continue;
479  }
480  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
481  << "Maximum number (" << shadow_str_count
482  << ") of Dictionary encoded Strings reached for this column, offset path "
483  "for column is "
484  << offsets_path_;
485 
486  string_memory_ids.push_back(input_string_idx);
487  sum_new_string_lengths += input_string.size();
488  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
489  if (materialize_hashes_) {
490  rk_hashes_[shadow_str_count] = input_string_rk_hash;
491  }
492  output_string_ids[input_string_idx++] = shadow_str_count++;
493  }
494  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
495  str_count_ = shadow_str_count;
496 
498 }
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAddBulkRemote()

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 517 of file StringDictionary.cpp.

References CHECK, and client_no_timeout_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

518  {
520  std::vector<int32_t> string_ids;
521  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
522  size_t out_idx{0};
523  for (size_t i = 0; i < string_ids.size(); ++i) {
524  const auto string_id = string_ids[i];
525  const bool invalid = string_id > max_valid_int_value<T>();
526  if (invalid || string_id == inline_int_null_value<int32_t>()) {
527  if (invalid) {
528  log_encoding_error<T>(string_vec[i]);
529  }
530  encoded_vec[out_idx++] = inline_int_null_value<T>();
531  continue;
532  }
533  encoded_vec[out_idx++] = string_id;
534  }
535 }
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ getOrAddImpl()

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 1062 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), fillRateIsHigh(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by getOrAdd().

1062  {
1063  // @TODO(wei) treat empty string as NULL for now
1064  if (str.size() == 0) {
1065  return inline_int_null_value<int32_t>();
1066  }
1067  CHECK(str.size() <= MAX_STRLEN);
1068  uint32_t bucket;
1069  const uint32_t hash = rk_hash(str);
1070  {
1071  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1072  bucket = computeBucket(hash, str, string_id_hash_table_);
1073  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1074  return string_id_hash_table_[bucket];
1075  }
1076  }
1077  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1078  // need to recalculate the bucket in case it changed before
1079  // we got the lock
1080  bucket = computeBucket(hash, str, string_id_hash_table_);
1081  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1083  << "Maximum number (" << str_count_
1084  << ") of Dictionary encoded Strings reached for this column, offset path "
1085  "for column is "
1086  << offsets_path_;
1087  if (fillRateIsHigh(str_count_)) {
1088  // resize when more than 50% is full
1089  increaseCapacity();
1090  bucket = computeBucket(hash, str, string_id_hash_table_);
1091  }
1092  appendToStorage(str);
1093  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1094  if (materialize_hashes_) {
1095  rk_hashes_[str_count_] = hash;
1096  }
1097  ++str_count_;
1099  }
1100  return string_id_hash_table_[bucket];
1101 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getRegexpLike()

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 903 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

905  {
906  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
907  if (client_) {
908  return client_->get_regexp_like(pattern, escape, generation);
909  }
910  const auto cache_key = std::make_pair(pattern, escape);
911  const auto it = regex_cache_.find(cache_key);
912  if (it != regex_cache_.end()) {
913  return it->second;
914  }
915  std::vector<int32_t> result;
916  std::vector<std::thread> workers;
917  int worker_count = cpu_threads();
918  CHECK_GT(worker_count, 0);
919  std::vector<std::vector<int32_t>> worker_results(worker_count);
920  CHECK_LE(generation, str_count_);
921  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
922  workers.emplace_back([&worker_results,
923  &pattern,
924  generation,
925  escape,
926  worker_idx,
927  worker_count,
928  this]() {
929  for (size_t string_id = worker_idx; string_id < generation;
930  string_id += worker_count) {
931  const auto str = getStringUnlocked(string_id);
932  if (is_regexp_like(str, pattern, escape)) {
933  worker_results[worker_idx].push_back(string_id);
934  }
935  }
936  });
937  }
938  for (auto& worker : workers) {
939  worker.join();
940  }
941  for (const auto& worker_result : worker_results) {
942  result.insert(result.end(), worker_result.begin(), worker_result.end());
943  }
944  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
945  CHECK(it_ok.second);
946 
947  return result;
948 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:

◆ getString()

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 571 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

571  {
572  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
573  if (client_) {
574  std::string ret;
575  client_->get_string(ret, string_id);
576  return ret;
577  }
578  return getStringUnlocked(string_id);
579 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringBytes()

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 586 of file StringDictionary.cpp.

References CHECK, CHECK_LE, CHECK_LT, client_, getStringBytesChecked(), rw_mutex_, and str_count_.

587  {
588  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
589  CHECK(!client_);
590  CHECK_LE(0, string_id);
591  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
592  return getStringBytesChecked(string_id);
593 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ getStringBytesChecked()

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1109 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringBytes().

1110  {
1111  const auto str_canary = getStringFromStorage(string_id);
1112  CHECK(!str_canary.canary);
1113  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1114 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringChecked()

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1103 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringUnlocked(), increaseCapacity(), and increaseCapacityFromStorageAndMemory().

1103  {
1104  const auto str_canary = getStringFromStorage(string_id);
1105  CHECK(!str_canary.canary);
1106  return std::string(str_canary.c_str_ptr, str_canary.size);
1107 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringFromStorage()

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1294 of file StringDictionary.cpp.

References CHECK_GE, isTemp_, StringDictionary::StringIdxEntry::off, offset_fd_, offset_map_, payload_fd_, payload_map_, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), getNumStringsFromStorage(), getStringBytesChecked(), getStringChecked(), mergeSortedCache(), sortCache(), and StringDictionary().

1295  {
1296  if (!isTemp_) {
1297  CHECK_GE(payload_fd_, 0);
1298  CHECK_GE(offset_fd_, 0);
1299  }
1300  CHECK_GE(string_id, 0);
1301  const StringIdxEntry* str_meta = offset_map_ + string_id;
1302  if (str_meta->size == 0xffff) {
1303  // hit the canary
1304  return {nullptr, 0, true};
1305  }
1306  return {payload_map_ + str_meta->off, str_meta->size, false};
1307 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210
+ Here is the caller graph for this function:

◆ getStringFromStorageFast()

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1288 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, offset_map_, payload_map_, and StringDictionary::StringIdxEntry::size.

Referenced by computeBucket(), and computeBucketFromStorageAndMemory().

1289  {
1290  const StringIdxEntry* str_meta = offset_map_ + string_id;
1291  return {payload_map_ + str_meta->off, str_meta->size};
1292 }
StringIdxEntry * offset_map_
+ Here is the caller graph for this function:

◆ getStringUnlocked()

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 581 of file StringDictionary.cpp.

References CHECK_LT, getStringChecked(), and str_count_.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

581  {
582  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
583  return getStringChecked(string_id);
584 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getUnlocked()

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 565 of file StringDictionary.cpp.

References computeBucket(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), and string_id_hash_table_.

Referenced by getIdOfString().

565  {
566  const uint32_t hash = rk_hash(str);
567  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
568  return str_id;
569 }
uint32_t rk_hash(const std::string_view &str)
std::vector< int32_t > string_id_hash_table_
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ hashStrings()

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< uint32_t > &  hashes 
) const
privatenoexcept

Method to rk_hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 346 of file StringDictionary.cpp.

References CHECK_EQ, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

347  {
348  CHECK_EQ(string_vec.size(), hashes.size());
349 
350  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
351  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
352  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
353  if (string_vec[curr_id].empty()) {
354  continue;
355  }
356  hashes[curr_id] = rk_hash(string_vec[curr_id]);
357  }
358  });
359 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ increaseCapacity()

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 1006 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk(), and getOrAddImpl().

1006  {
1007  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1008 
1009  if (materialize_hashes_) {
1010  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1012  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1013  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1014  new_str_ids[bucket] = string_id_hash_table_[i];
1015  }
1016  }
1017  rk_hashes_.resize(rk_hashes_.size() * 2);
1018  } else {
1019  for (size_t i = 0; i < str_count_; ++i) {
1020  const auto str = getStringChecked(i);
1021  const uint32_t hash = rk_hash(str);
1022  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1023  new_str_ids[bucket] = i;
1024  }
1025  }
1026  string_id_hash_table_.swap(new_str_ids);
1027 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ increaseCapacityFromStorageAndMemory()

template<class String >
void StringDictionary::increaseCapacityFromStorageAndMemory ( const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< uint32_t > &  input_strings_rk_hashes 
)
privatenoexcept

Definition at line 1030 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, and string_id_hash_table_.

Referenced by getOrAddBulkParallel().

1034  {
1035  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1036  if (materialize_hashes_) {
1037  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1039  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1040  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1041  new_str_ids[bucket] = string_id_hash_table_[i];
1042  }
1043  }
1044  rk_hashes_.resize(rk_hashes_.size() * 2);
1045  } else {
1046  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1047  const auto storage_string = getStringChecked(storage_idx);
1048  const uint32_t hash = rk_hash(storage_string);
1049  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1050  new_str_ids[bucket] = storage_idx;
1051  }
1052  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1053  size_t string_memory_id = string_memory_ids[memory_idx];
1054  uint32_t bucket = computeUniqueBucketWithHash(
1055  input_strings_rk_hashes[string_memory_id], new_str_ids);
1056  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1057  }
1058  }
1059  string_id_hash_table_.swap(new_str_ids);
1060 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ insertInSortedCache()

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private

◆ invalidateInvertedIndex()

void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1369 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddImpl().

1369  {
1370  if (!like_cache_.empty()) {
1371  decltype(like_cache_)().swap(like_cache_);
1372  }
1373  if (!regex_cache_.empty()) {
1374  decltype(regex_cache_)().swap(regex_cache_);
1375  }
1376  if (!equal_cache_.empty()) {
1377  decltype(equal_cache_)().swap(equal_cache_);
1378  }
1379  compare_cache_.invalidateInvertedIndex();
1380 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
+ Here is the caller graph for this function:

◆ mergeSortedCache()

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1423 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1423  {
1424  // this method is not thread safe
1425  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1426  size_t t_idx = 0, s_idx = 0, idx = 0;
1427  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1428  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1429  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1430  const auto insert_from_temp_cache =
1431  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1432  if (insert_from_temp_cache) {
1433  updated_cache[idx] = temp_sorted_cache[t_idx++];
1434  } else {
1435  updated_cache[idx] = sorted_cache[s_idx++];
1436  }
1437  }
1438  while (t_idx < temp_sorted_cache.size()) {
1439  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1440  }
1441  while (s_idx < sorted_cache.size()) {
1442  updated_cache[idx++] = sorted_cache[s_idx++];
1443  }
1444  sorted_cache.swap(updated_cache);
1445 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_array_ids()

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1474 of file StringDictionary.cpp.

References populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1478  {
1479  dest_array_ids.resize(source_array_ids.size());
1480 
1481  std::atomic<size_t> row_idx{0};
1482  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1483  int thread_id) {
1484  for (;;) {
1485  auto row = row_idx.fetch_add(1);
1486 
1487  if (row >= dest_array_ids.size()) {
1488  return;
1489  }
1490  const auto& source_ids = source_array_ids[row];
1491  auto& dest_ids = dest_array_ids[row];
1492  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1493  }
1494  };
1495 
1496  const int num_worker_threads = std::thread::hardware_concurrency();
1497 
1498  if (source_array_ids.size() / num_worker_threads > 10) {
1499  std::vector<std::future<void>> worker_threads;
1500  for (int i = 0; i < num_worker_threads; ++i) {
1501  worker_threads.push_back(std::async(std::launch::async, processor, i));
1502  }
1503 
1504  for (auto& child : worker_threads) {
1505  child.wait();
1506  }
1507  for (auto& child : worker_threads) {
1508  child.get();
1509  }
1510  } else {
1511  processor(0);
1512  }
1513 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:731
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_ids()

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1447 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1452  {
1453  std::vector<std::string> strings;
1454 
1455  for (const int32_t source_id : source_ids) {
1456  if (source_id == std::numeric_limits<int32_t>::min()) {
1457  strings.emplace_back("");
1458  } else if (source_id < 0) {
1459  if (auto string_itr = transient_mapping.find(source_id);
1460  string_itr != transient_mapping.end()) {
1461  strings.emplace_back(string_itr->second);
1462  } else {
1463  throw std::runtime_error("Unexpected negative source ID");
1464  }
1465  } else {
1466  strings.push_back(source_dict->getString(source_id));
1467  }
1468  }
1469 
1470  dest_ids.resize(strings.size());
1471  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1472 }
std::string getString(int32_t string_id) const
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ processDictionaryFutures()

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 226 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by StringDictionary().

228  {
229  for (auto& dictionary_future : dictionary_futures) {
230  dictionary_future.wait();
231  auto hashVec = dictionary_future.get();
232  for (auto& hash : hashVec) {
233  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
234  payload_file_off_ += hash.second;
235  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
236  if (materialize_hashes_) {
237  rk_hashes_[str_count_] = hash.first;
238  }
239  ++str_count_;
240  }
241  }
242  dictionary_futures.clear();
243 }
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ sortCache()

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1410 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1410  {
1411  // This method is not thread-safe.
1412 
1413  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1414  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1415 
1416  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1417  auto a_str = this->getStringFromStorage(a);
1418  auto b_str = this->getStringFromStorage(b);
1419  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1420  });
1421 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ storageEntryCount()

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 595 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

Referenced by ArrowCsvForeignStorage::createDictionaryEncodedColumn().

595  {
596  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
597  if (client_) {
598  return client_->storage_entry_count();
599  }
600  return str_count_;
601 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock
+ Here is the caller graph for this function:

Member Data Documentation

◆ CANARY_BUFFER

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 226 of file StringDictionary.h.

Referenced by addMemoryCapacity(), addStorageCapacity(), and ~StringDictionary().

◆ canary_buffer_size

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 227 of file StringDictionary.h.

Referenced by addMemoryCapacity(), and addStorageCapacity().

◆ client_

◆ client_no_timeout_

std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 224 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

◆ collisions_

size_t StringDictionary::collisions_
private

Definition at line 202 of file StringDictionary.h.

Referenced by computeUniqueBucketWithHash(), and StringDictionary().

◆ compare_cache_

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 221 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

◆ equal_cache_

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 220 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

◆ INVALID_STR_ID

◆ isTemp_

◆ like_cache_

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 218 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

◆ materialize_hashes_

◆ MAX_STRCOUNT

constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 115 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddImpl().

◆ MAX_STRLEN

◆ offset_fd_

◆ offset_file_size_

size_t StringDictionary::offset_file_size_
private

◆ offset_map_

◆ offsets_path_

std::string StringDictionary::offsets_path_
private

◆ payload_fd_

◆ payload_file_off_

size_t StringDictionary::payload_file_off_
private

◆ payload_file_size_

size_t StringDictionary::payload_file_size_
private

◆ payload_map_

◆ regex_cache_

std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 219 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

◆ rk_hashes_

◆ rw_mutex_

◆ sorted_cache

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 205 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

◆ str_count_

◆ string_id_hash_table_

std::vector<int32_t> StringDictionary::string_id_hash_table_
private

◆ strings_cache_

std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 222 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: