OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
class  StringCallback
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const shared::StringDictKey &dict_key, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const shared::StringDictKey &dict_key)
 
 ~StringDictionary () noexcept
 
const shared::StringDictKeygetDictKey () const noexcept
 
void eachStringSerially (int64_t const generation, StringCallback &) const
 
std::function< int32_t(std::string
const &)> 
makeLambdaStringToId () const
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec) const
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec, const int64_t generation) const
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
template<class String >
int32_t getIdOfString (const String &) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::vector< std::string > copyStrings () const
 
std::vector< std::string_view > getStringViews () const
 
std::vector< std::string_view > getStringViews (const size_t generation) const
 
std::vector< int32_t > buildDictionaryTranslationMap (const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
 
size_t buildDictionaryTranslationMap (const StringDictionary *dest_dict, int32_t *translated_ids, const int64_t source_generation, const int64_t dest_generation, const bool dest_has_transients, StringLookupCallback const &dest_transient_lookup_callback, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
 
void buildDictionaryNumericTranslationMap (Datum *translated_ids, const int64_t source_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
 
bool checkpoint () noexcept
 
bool isClient () const noexcept
 
void update_leaf (const LeafHostInfo &host_info)
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
int32_t getUnlocked (const std::string_view sv) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

const shared::StringDictKey dict_key_
 
const std::string folder_
 
size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
std::shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Friends

class StringLocalCallback
 

Detailed Description

Definition at line 54 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const shared::StringDictKey dict_key,
const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 118 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), threading_serial::async(), CHECK_EQ, heavyai::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, heavyai::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

124  : dict_key_(dict_key)
125  , folder_(folder)
126  , str_count_(0)
128  , hash_cache_(initial_capacity)
129  , isTemp_(isTemp)
130  , materialize_hashes_(materializeHashes)
131  , payload_fd_(-1)
132  , offset_fd_(-1)
133  , offset_map_(nullptr)
134  , payload_map_(nullptr)
135  , offset_file_size_(0)
136  , payload_file_size_(0)
137  , payload_file_off_(0)
138  , strings_cache_(nullptr) {
139  if (!isTemp && folder.empty()) {
140  return;
141  }
142 
143  // initial capacity must be a power of two for efficient bucket computation
144  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
145  if (!isTemp_) {
146  boost::filesystem::path storage_path(folder);
147  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
148  const auto payload_path =
149  (storage_path / boost::filesystem::path("DictPayload")).string();
150  payload_fd_ = checked_open(payload_path.c_str(), recover);
151  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
154  }
155  bool storage_is_empty = false;
156  if (payload_file_size_ == 0) {
157  storage_is_empty = true;
159  }
160  if (offset_file_size_ == 0) {
162  }
163  if (!isTemp_) { // we never mmap or recover temp dictionaries
164  payload_map_ =
165  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
166  offset_map_ = reinterpret_cast<StringIdxEntry*>(
168  if (recover) {
169  const size_t bytes = heavyai::file_size(offset_fd_);
170  if (bytes % sizeof(StringIdxEntry) != 0) {
171  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
172  }
173  const uint64_t str_count =
174  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
175  collisions_ = 0;
176  // at this point we know the size of the StringDict we need to load
177  // so lets reallocate the vector to the correct size
178  const uint64_t max_entries =
179  std::max(round_up_p2(str_count * 2 + 1),
180  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
181  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
182  string_id_string_dict_hash_table_.swap(new_str_ids);
183  if (materialize_hashes_) {
184  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
185  hash_cache_.swap(new_hash_cache);
186  }
187  // Bail early if we know we don't have strings to add (i.e. a new or empty
188  // dictionary)
189  if (str_count == 0) {
190  return;
191  }
192 
193  unsigned string_id = 0;
194  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
195 
196  uint32_t thread_inits = 0;
197  const auto thread_count = std::thread::hardware_concurrency();
198  const uint32_t items_per_thread = std::max<uint32_t>(
199  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
200  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
201  dictionary_futures;
202  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
203  dictionary_futures.emplace_back(std::async(
204  std::launch::async, [string_id, str_count, items_per_thread, this] {
205  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
206  for (uint32_t curr_id = string_id;
207  curr_id < string_id + items_per_thread && curr_id < str_count;
208  curr_id++) {
209  const auto recovered = getStringFromStorage(curr_id);
210  if (recovered.canary) {
211  // hit the canary, recovery finished
212  break;
213  } else {
214  std::string_view temp(recovered.c_str_ptr, recovered.size);
215  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
216  }
217  }
218  return hashVec;
219  }));
220  thread_inits++;
221  if (thread_inits % thread_count == 0) {
222  processDictionaryFutures(dictionary_futures);
223  }
224  }
225  // gather last few threads
226  if (dictionary_futures.size() != 0) {
227  processDictionaryFutures(dictionary_futures);
228  }
229  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
230  << " Hash table size: " << string_id_string_dict_hash_table_.size()
231  << " Fill rate: "
232  << static_cast<double>(str_count_) * 100.0 /
234  << "% Collisions: " << collisions_;
235  }
236  }
237 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:301
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:285
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
const shared::StringDictKey dict_key_
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
PayloadString getStringFromStorage(const int string_id) const noexcept
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
#define VLOG(n)
Definition: Logger.h:387
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const shared::StringDictKey dict_key 
)

Definition at line 346 of file StringDictionary.cpp.

348  : dict_key_(dict_key)
349  , folder_("DB_" + std::to_string(dict_key.db_id) + "_DICT_" +
350  std::to_string(dict_key.dict_id))
351  , strings_cache_(nullptr)
352  , client_(new StringDictionaryClient(host, {dict_key.db_id, dict_key.dict_id}, true))
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
const shared::StringDictKey dict_key_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 356 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, heavyai::checked_munmap(), heavyai::close(), isClient(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

356  {
357  free(CANARY_BUFFER);
358  if (isClient()) {
359  return;
360  }
361  if (payload_map_) {
362  if (!isTemp_) {
366  CHECK_GE(payload_fd_, 0);
368  CHECK_GE(offset_fd_, 0);
370  } else {
372  free(payload_map_);
373  free(offset_map_);
374  }
375  }
376 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void close(const int fd)
Definition: heavyai_fs.cpp:70

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1542 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1544  {
1545  const size_t canary_buff_size_to_add =
1546  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1547  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1548  if (canary_buffer_size < canary_buff_size_to_add) {
1549  CANARY_BUFFER =
1550  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1551  canary_buffer_size = canary_buff_size_to_add;
1553  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1554  }
1555  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1556  CHECK(new_addr);
1557  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1558  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1559  mem_size += canary_buff_size_to_add;
1560  return new_addr;
1561 }
#define CHECK(condition)
Definition: Logger.h:291
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1512 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1512  {
1513  if (!isTemp_) {
1514  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1515  } else {
1516  offset_map_ = static_cast<StringIdxEntry*>(
1517  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1518  }
1519 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1503 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1503  {
1504  if (!isTemp_) {
1505  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1506  } else {
1507  payload_map_ = static_cast<char*>(
1508  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1509  }
1510 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1521 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1523  {
1524  const size_t canary_buff_size_to_add =
1525  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1526  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1527 
1528  if (canary_buffer_size < canary_buff_size_to_add) {
1529  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1530  canary_buffer_size = canary_buff_size_to_add;
1532  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1533  }
1534 
1535  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1536  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1537  CHECK(write_return > 0 &&
1538  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1539  return canary_buff_size_to_add;
1540 }
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:160
#define CHECK_NE(x, y)
Definition: Logger.h:302
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1448 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1448  {
1449  // write the payload
1451  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1452 
1453  // write the offset and length
1454  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1455  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1456 
1458  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1459 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1462 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1465  {
1466  const size_t num_strings = string_memory_ids.size();
1467 
1468  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1469  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1470 
1471  for (size_t i = 0; i < num_strings; ++i) {
1472  const size_t string_idx = string_memory_ids[i];
1473  const String str = input_strings[string_idx];
1474  const size_t str_size(str.size());
1475  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1476  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1477  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1478  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1479  }
1480 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildDictionaryNumericTranslationMap ( Datum translated_ids,
const int64_t  source_generation,
const std::vector< StringOps_Namespace::StringOpInfo > &  string_op_infos 
) const

Definition at line 1959 of file StringDictionary.cpp.

References CHECK, CHECK_GE, CHECK_GT, CHECK_LE, DEBUG_TIMER, getStringFromStorageFast(), ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, and str_count_.

1962  {
1963  auto timer = DEBUG_TIMER(__func__);
1964  CHECK_GE(source_generation, 0L);
1965  CHECK_GT(string_op_infos.size(), 0UL);
1966  CHECK(!string_op_infos.back().getReturnType().is_string());
1967  const int64_t num_source_strings = source_generation;
1968 
1969  // We can bail early if there are no source strings to translate
1970  if (num_source_strings == 0L) {
1971  return;
1972  }
1973 
1974  // If here we should should have a local dictionary
1975  // Note case of transient source dictionaries that aren't
1976  // seen as remote (they have no client_no_timeout_) is covered
1977  // by early bail above on num_source_strings == 0
1978 
1979  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_);
1980 
1981  // For source dictionary we cap the number of entries
1982  // to be translated/translated to at the supplied
1983  // generation arguments, if valid (i.e. >= 0), otherwise
1984  // just the size of each dictionary
1985 
1986  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1987 
1988  constexpr int64_t target_strings_per_thread{1000};
1989  const ThreadInfo thread_info(
1990  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1991  CHECK_GE(thread_info.num_threads, 1L);
1992  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1993 
1994  // We use a tbb::task_arena to cap the number of threads, has been
1995  // in other contexts been shown to exhibit better performance when low
1996  // numbers of threads are needed than just letting tbb figure the number of threads,
1997  // but should benchmark in this specific context
1998 
1999  const StringOps_Namespace::StringOps string_ops(string_op_infos);
2000  CHECK_GT(string_ops.size(), 0UL);
2001 
2002  tbb::task_arena limited_arena(thread_info.num_threads);
2003  // The below logic, by executing low-level private variable accesses on both
2004  // dictionaries, is less clean than a previous variant that simply called
2005  // `getStringViews` from the source dictionary and then called `getBulk` on the
2006  // destination dictionary, but this version gets significantly better performance
2007  // (~2X), likely due to eliminating the overhead of writing out the string views and
2008  // then reading them back in (along with the associated cache misses)
2010  tbb::blocked_range<int32_t>(
2011  0, num_source_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
2012  [&](const tbb::blocked_range<int32_t>& r) {
2013  const int32_t start_idx = r.begin();
2014  const int32_t end_idx = r.end();
2015  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
2016  ++source_string_id) {
2017  const std::string source_str =
2018  std::string(getStringFromStorageFast(source_string_id));
2019  translated_ids[source_string_id] = string_ops.numericEval(source_str);
2020  }
2021  });
2022 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
#define CHECK_GT(x, y)
Definition: Logger.h:305
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::buildDictionaryTranslationMap ( const std::shared_ptr< StringDictionary dest_dict,
StringLookupCallback const &  dest_transient_lookup_callback 
) const

Definition at line 1774 of file StringDictionary.cpp.

References DEBUG_TIMER, and storageEntryCount().

1776  {
1777  auto timer = DEBUG_TIMER(__func__);
1778  const size_t num_source_strings = storageEntryCount();
1779  const size_t num_dest_strings = dest_dict->storageEntryCount();
1780  std::vector<int32_t> translated_ids(num_source_strings);
1781 
1782  buildDictionaryTranslationMap(dest_dict.get(),
1783  translated_ids.data(),
1784  num_source_strings,
1785  num_dest_strings,
1786  true, // Just assume true for dest_has_transients as this
1787  // function is only used for testing currently
1788  dest_transient_lookup_callback,
1789  {});
1790  return translated_ids;
1791 }
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
size_t storageEntryCount() const
#define DEBUG_TIMER(name)
Definition: Logger.h:411

+ Here is the call graph for this function:

size_t StringDictionary::buildDictionaryTranslationMap ( const StringDictionary dest_dict,
int32_t *  translated_ids,
const int64_t  source_generation,
const int64_t  dest_generation,
const bool  dest_has_transients,
StringLookupCallback const &  dest_transient_lookup_callback,
const std::vector< StringOps_Namespace::StringOpInfo > &  string_op_infos 
) const

Definition at line 1811 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, client_no_timeout_, computeBucket(), DEBUG_TIMER, getDictKey(), getStringFromStorageFast(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, order_translation_locks(), threading_serial::parallel_for(), rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

1818  {
1819  auto timer = DEBUG_TIMER(__func__);
1820  CHECK_GE(source_generation, 0L);
1821  CHECK_GE(dest_generation, 0L);
1822  const int64_t num_source_strings = source_generation;
1823  const int64_t num_dest_strings = dest_generation;
1824 
1825  // We can bail early if there are no source strings to translate
1826  if (num_source_strings == 0L) {
1827  return 0;
1828  }
1829 
1830  // If here we should should have local dictionaries.
1831  // Note case of transient source dictionaries that aren't
1832  // seen as remote (they have no client_no_timeout_) is covered
1833  // by early bail above on num_source_strings == 0
1834  if (dest_dict->client_no_timeout_) {
1835  throw std::runtime_error(
1836  "Cannot translate between a local source and remote destination dictionary.");
1837  }
1838 
1839  // Sort this/source dict and dest dict on folder_ so we can enforce
1840  // lock ordering and avoid deadlocks
1841  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1842  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1843  std::defer_lock);
1845  getDictKey(), dest_dict->getDictKey(), source_read_lock, dest_read_lock);
1846 
1847  // For both source and destination dictionaries we cap the max
1848  // entries to be translated/translated to at the supplied
1849  // generation arguments, if valid (i.e. >= 0), otherwise just the
1850  // size of each dictionary
1851 
1852  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1853  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1854  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1855 
1856  constexpr int64_t target_strings_per_thread{1000};
1857  const ThreadInfo thread_info(
1858  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1859  CHECK_GE(thread_info.num_threads, 1L);
1860  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1861 
1862  // We use a tbb::task_arena to cap the number of threads, has been
1863  // in other contexts been shown to exhibit better performance when low
1864  // numbers of threads are needed than just letting tbb figure the number of threads,
1865  // but should benchmark in this specific context
1866 
1867  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1868  const bool has_string_ops = string_ops.size();
1869 
1870  tbb::task_arena limited_arena(thread_info.num_threads);
1871  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1872  constexpr bool short_circuit_empty_dictionary_translations{false};
1873  limited_arena.execute([&] {
1874  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1875  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1877  tbb::blocked_range<int32_t>(
1878  0,
1879  num_source_strings,
1880  thread_info.num_elems_per_thread /* tbb grain_size */),
1881  [&](const tbb::blocked_range<int32_t>& r) {
1882  const int32_t start_idx = r.begin();
1883  const int32_t end_idx = r.end();
1884  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1885  translated_ids[string_idx] = INVALID_STR_ID;
1886  }
1887  },
1888  tbb::simple_partitioner());
1889  num_strings_not_translated_per_thread[0] += num_source_strings;
1890  } else {
1891  // The below logic, by executing low-level private variable accesses on both
1892  // dictionaries, is less clean than a previous variant that simply called
1893  // `getStringViews` from the source dictionary and then called `getBulk` on the
1894  // destination dictionary, but this version gets significantly better performance
1895  // (~2X), likely due to eliminating the overhead of writing out the string views and
1896  // then reading them back in (along with the associated cache misses)
1898  tbb::blocked_range<int32_t>(
1899  0,
1900  num_source_strings,
1901  thread_info.num_elems_per_thread /* tbb grain_size */),
1902  [&](const tbb::blocked_range<int32_t>& r) {
1903  const int32_t start_idx = r.begin();
1904  const int32_t end_idx = r.end();
1905  size_t num_strings_not_translated = 0;
1906  std::string string_ops_storage; // Needs to be thread local to back
1907  // string_view returned by string_ops()
1908  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1909  ++source_string_id) {
1910  const std::string_view source_str =
1911  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1912  string_ops_storage)
1913  : getStringFromStorageFast(source_string_id);
1914 
1915  if (source_str.empty()) {
1916  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1917  continue;
1918  }
1919  // Get the hash from this/the source dictionary's cache, as the function
1920  // will be the same for the dest_dict, sparing us having to recompute it
1921 
1922  // Todo(todd): Remove option to turn string hash cache off or at least
1923  // make a constexpr to avoid these branches when we expect it to be always
1924  // on going forward
1925  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1926  ? hash_cache_[source_string_id]
1927  : hash_string(source_str);
1928  const uint32_t hash_bucket = dest_dict->computeBucket(
1929  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1930  const auto translated_string_id =
1931  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
1932  translated_ids[source_string_id] = translated_string_id;
1933 
1934  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
1935  translated_string_id >= num_dest_strings) {
1936  if (dest_has_transients) {
1937  num_strings_not_translated +=
1938  dest_transient_lookup_callback(source_str, source_string_id);
1939  } else {
1940  num_strings_not_translated++;
1941  }
1942  continue;
1943  }
1944  }
1945  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
1946  num_strings_not_translated_per_thread[tbb_thread_idx] +=
1947  num_strings_not_translated;
1948  },
1949  tbb::simple_partitioner());
1950  }
1951  });
1952  size_t total_num_strings_not_translated = 0;
1953  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
1954  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
1955  }
1956  return total_num_strings_not_translated;
1957 }
string_dict_hash_t hash_string(const std::string_view &str)
const shared::StringDictKey & getDictKey() const noexcept
std::vector< string_dict_hash_t > hash_cache_
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void order_translation_locks(const shared::StringDictKey &source_dict_key, const shared::StringDictKey &dest_dict_key, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
uint32_t string_dict_hash_t
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

void StringDictionary::buildSortedCache ( )
private

Definition at line 1603 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1603  {
1604  // This method is not thread-safe.
1605  const auto cur_cache_size = sorted_cache.size();
1606  std::vector<int32_t> temp_sorted_cache;
1607  for (size_t i = cur_cache_size; i < str_count_; i++) {
1608  temp_sorted_cache.push_back(i);
1609  }
1610  sortCache(temp_sorted_cache);
1611  mergeSortedCache(temp_sorted_cache);
1612 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1427 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1428  {
1429  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1430  if (offset_file_off + write_length >= offset_file_size_) {
1431  const size_t min_capacity_needed =
1432  write_length - (offset_file_size_ - offset_file_off);
1433  if (!isTemp_) {
1434  CHECK_GE(offset_fd_, 0);
1436  addOffsetCapacity(min_capacity_needed);
1437  CHECK(offset_file_off + write_length <= offset_file_size_);
1438  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1440  } else {
1441  addOffsetCapacity(min_capacity_needed);
1442  CHECK(offset_file_off + write_length <= offset_file_size_);
1443  }
1444  }
1445 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:306
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1408 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1409  {
1410  if (payload_file_off_ + write_length > payload_file_size_) {
1411  const size_t min_capacity_needed =
1412  write_length - (payload_file_size_ - payload_file_off_);
1413  if (!isTemp_) {
1414  CHECK_GE(payload_fd_, 0);
1416  addPayloadCapacity(min_capacity_needed);
1417  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1418  payload_map_ =
1419  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1420  } else {
1421  addPayloadCapacity(min_capacity_needed);
1422  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1423  }
1424  }
1425 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK(condition)
Definition: Logger.h:291
void checked_munmap(void *addr, size_t length)
Definition: heavyai_fs.cpp:53
void * checked_mmap(const int fd, const size_t sz)
Definition: heavyai_fs.cpp:40

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1580 of file StringDictionary.cpp.

References CHECK, client_, heavyai::fsync(), isClient(), isTemp_, heavyai::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1580  {
1581  if (isClient()) {
1582  try {
1583  return client_->checkpoint();
1584  } catch (...) {
1585  return false;
1586  }
1587  }
1588  CHECK(!isTemp_);
1589  bool ret = true;
1590  ret = ret &&
1591  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1592  ret = ret &&
1593  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1594  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1595  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1596  return ret;
1597 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int msync(void *addr, size_t length, bool async)
Definition: heavyai_fs.cpp:57
int fsync(int fd)
Definition: heavyai_fs.cpp:62
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1310 of file StringDictionary.cpp.

Referenced by buildDictionaryTranslationMap(), getBulk(), and getOrAddBulk().

1313  {
1314  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1315  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1316  while (true) {
1317  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1318  if (candidate_string_id ==
1319  INVALID_STR_ID) { // In this case it means the slot is available for use
1320  break;
1321  }
1322  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1324  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1325  if (input_string.size() == candidate_string.size() &&
1326  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1327  // found the string
1328  break;
1329  }
1330  }
1331  // wrap around
1332  if (++bucket == string_dict_hash_table_size) {
1333  bucket = 0;
1334  }
1335  }
1336  return bucket;
1337 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1340 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1346  {
1347  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1348  while (true) {
1349  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1350  if (candidate_string_id ==
1351  INVALID_STR_ID) { // In this case it means the slot is available for use
1352  break;
1353  }
1354  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1355  if (candidate_string_id > 0 &&
1356  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1357  // The candidate string is not in storage yet but in our string_memory_ids temp
1358  // buffer
1359  size_t memory_offset =
1360  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1361  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1362  if (input_string.size() == candidate_string.size() &&
1363  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1364  // found the string in the temp memory buffer
1365  break;
1366  }
1367  } else {
1368  // The candidate string is in storage, need to fetch it for comparison
1369  const auto candidate_storage_string =
1370  getStringFromStorageFast(candidate_string_id);
1371  if (input_string.size() == candidate_storage_string.size() &&
1372  !memcmp(input_string.data(),
1373  candidate_storage_string.data(),
1374  input_string.size())) {
1377  // found the string in storage
1378  break;
1379  }
1380  }
1381  }
1382  if (++bucket == string_id_string_dict_hash_table.size()) {
1383  bucket = 0;
1384  }
1385  }
1386  return bucket;
1387 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1389 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1391  {
1392  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1393  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1394  while (true) {
1395  if (string_id_string_dict_hash_table[bucket] ==
1396  INVALID_STR_ID) { // In this case it means the slot is available for use
1397  break;
1398  }
1399  collisions_++;
1400  // wrap around
1401  if (++bucket == string_dict_hash_table_size) {
1402  bucket = 0;
1403  }
1404  }
1405  return bucket;
1406 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::vector< std::string > StringDictionary::copyStrings ( ) const

Definition at line 1145 of file StringDictionary.cpp.

References threading_serial::async(), CHECK_EQ, CHECK_GT, CHECK_LE, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), isClient(), rw_mutex_, str_count_, and strings_cache_.

1145  {
1146  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1147  if (isClient()) {
1148  // TODO(miyu): support remote string dictionary
1149  throw std::runtime_error(
1150  "copying dictionaries from remote server is not supported yet.");
1151  }
1152 
1153  if (strings_cache_) {
1154  return *strings_cache_;
1155  }
1156 
1157  strings_cache_ = std::make_shared<std::vector<std::string>>();
1158  strings_cache_->reserve(str_count_);
1159  const bool multithreaded = str_count_ > 10000;
1160  const auto worker_count =
1161  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1162  CHECK_GT(worker_count, 0UL);
1163  std::vector<std::vector<std::string>> worker_results(worker_count);
1164  auto copy = [this](std::vector<std::string>& str_list,
1165  const size_t start_id,
1166  const size_t end_id) {
1167  CHECK_LE(start_id, end_id);
1168  str_list.reserve(end_id - start_id);
1169  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1170  str_list.push_back(getStringUnlocked(string_id));
1171  }
1172  };
1173  if (multithreaded) {
1174  std::vector<std::future<void>> workers;
1175  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1176  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1177  worker_idx < worker_count && start < str_count_;
1178  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1179  workers.push_back(std::async(
1180  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
1181  }
1182  for (auto& worker : workers) {
1183  worker.get();
1184  }
1185  } else {
1186  CHECK_EQ(worker_results.size(), size_t(1));
1187  copy(worker_results[0], 0, str_count_);
1188  }
1189 
1190  for (const auto& worker_result : worker_results) {
1191  strings_cache_->insert(
1192  strings_cache_->end(), worker_result.begin(), worker_result.end());
1193  }
1194  return *strings_cache_;
1195 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string getStringUnlocked(int32_t string_id) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_LE(x, y)
Definition: Logger.h:304
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

void StringDictionary::eachStringSerially ( int64_t const  generation,
StringCallback serial_callback 
) const

Definition at line 268 of file StringDictionary.cpp.

References CHECK_LE, client_, getStringFromStorageFast(), isClient(), anonymous_namespace{Utm.h}::n, rw_mutex_, storageEntryCount(), and str_count_.

Referenced by makeLambdaStringToId().

269  {
270  if (isClient()) {
271  // copyStrings() is not supported when isClient().
272  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
273  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
274  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
275  for (unsigned id = 0; id < n; ++id) {
276  {
277  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
278  client_->get_string(str, id);
279  }
280  serial_callback(str, id);
281  }
282  } else {
283  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
284  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
285  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
286  for (unsigned id = 0; id < n; ++id) {
287  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
288  }
289  }
290 }
bool isClient() const noexcept
heavyai::shared_lock< heavyai::shared_mutex > read_lock
size_t storageEntryCount() const
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
#define CHECK_LE(x, y)
Definition: Logger.h:304
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1197 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1197  {
1198  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1199 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
) const

Definition at line 487 of file StringDictionary.cpp.

488  {
489  return getBulk(string_vec, encoded_vec, -1L /* generation */);
490 }
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec,
const int64_t  generation 
) const

Definition at line 500 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, computeBucket(), dict_key_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, MAX_STRLEN, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, storageEntryCount(), string_id_string_dict_hash_table_, and anonymous_namespace{StringDictionary.cpp}::throw_string_too_long_error().

502  {
503  constexpr int64_t target_strings_per_thread{1000};
504  const int64_t num_lookup_strings = string_vec.size();
505  if (num_lookup_strings == 0) {
506  return 0;
507  }
508 
509  const ThreadInfo thread_info(
510  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
511  CHECK_GE(thread_info.num_threads, 1L);
512  CHECK_GE(thread_info.num_elems_per_thread, 1L);
513 
514  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
515 
516  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
517  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
518  const bool dictionary_is_empty = (num_dict_strings == 0);
519  if (dictionary_is_empty) {
520  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
521  [&](const tbb::blocked_range<int64_t>& r) {
522  const int64_t start_idx = r.begin();
523  const int64_t end_idx = r.end();
524  for (int64_t string_idx = start_idx; string_idx < end_idx;
525  ++string_idx) {
526  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
527  }
528  });
529  return num_lookup_strings;
530  }
531  // If we're here the generation-capped dictionary has strings in it
532  // that we need to look up against
533 
534  tbb::task_arena limited_arena(thread_info.num_threads);
535  limited_arena.execute([&] {
536  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
538  tbb::blocked_range<int64_t>(
539  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
540  [&](const tbb::blocked_range<int64_t>& r) {
541  const int64_t start_idx = r.begin();
542  const int64_t end_idx = r.end();
543  size_t num_strings_not_found = 0;
544  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
545  const auto& input_string = string_vec[string_idx];
546  if (input_string.empty()) {
547  encoded_vec[string_idx] = inline_int_null_value<T>();
548  continue;
549  }
550  if (input_string.size() > StringDictionary::MAX_STRLEN) {
551  throw_string_too_long_error(input_string, dict_key_);
552  }
553  const string_dict_hash_t input_string_hash = hash_string(input_string);
554  uint32_t hash_bucket = computeBucket(
555  input_string_hash, input_string, string_id_string_dict_hash_table_);
556  // Will either be legit id or INVALID_STR_ID
557  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
558  if (string_id == StringDictionary::INVALID_STR_ID ||
559  string_id >= num_dict_strings) {
560  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
561  num_strings_not_found++;
562  continue;
563  }
564  encoded_vec[string_idx] = string_id;
565  }
566  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
567  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
568  },
569  tbb::simple_partitioner());
570  });
571 
572  size_t num_strings_not_found = 0;
573  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
574  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
575  }
576  return num_strings_not_found;
577 }
void throw_string_too_long_error(std::string_view str, const shared::StringDictKey &dict_key)
string_dict_hash_t hash_string(const std::string_view &str)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
const shared::StringDictKey dict_key_
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 937 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), isClient(), gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

939  {
940  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
941  if (isClient()) {
942  return client_->get_compare(pattern, comp_operator, generation);
943  }
944  std::vector<int32_t> ret;
945  if (str_count_ == 0) {
946  return ret;
947  }
948  if (sorted_cache.size() < str_count_) {
949  if (comp_operator == "=" || comp_operator == "<>") {
950  return getEquals(pattern, comp_operator, generation);
951  }
952 
954  }
955  auto cache_index = compare_cache_.get(pattern);
956 
957  if (!cache_index) {
958  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
959  const auto cache_itr = std::lower_bound(
960  sorted_cache.begin(),
961  sorted_cache.end(),
962  pattern,
963  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
964  auto a_str = this->getStringFromStorage(a);
965  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
966  });
967 
968  if (cache_itr == sorted_cache.end()) {
969  cache_index->index = sorted_cache.size() - 1;
970  cache_index->diff = 1;
971  } else {
972  const auto cache_str = getStringFromStorage(*cache_itr);
973  if (!string_eq(
974  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
975  cache_index->index = cache_itr - sorted_cache.begin() - 1;
976  cache_index->diff = 1;
977  } else {
978  cache_index->index = cache_itr - sorted_cache.begin();
979  cache_index->diff = 0;
980  }
981  }
982 
983  compare_cache_.put(pattern, cache_index);
984  }
985 
986  // since we have a cache in form of vector of ints which is sorted according to
987  // corresponding strings in the dictionary all we need is the index of the element
988  // which equal to the pattern that we are trying to match or the index of “biggest”
989  // element smaller than the pattern, to perform all the comparison operators over
990  // string. The search function guarantees we have such index so now it is just the
991  // matter to include all the elements in the result vector.
992 
993  // For < operator if the index that we have points to the element which is equal to
994  // the pattern that we are searching for we simply get all the elements less than the
995  // index. If the element pointed by the index is not equal to the pattern we are
996  // comparing with we also need to include that index in result vector, except when the
997  // index points to 0 and the pattern is lesser than the smallest value in the string
998  // dictionary.
999 
1000  if (comp_operator == "<") {
1001  size_t idx = cache_index->index;
1002  if (cache_index->diff) {
1003  idx = cache_index->index + 1;
1004  if (cache_index->index == 0 && cache_index->diff > 0) {
1005  idx = cache_index->index;
1006  }
1007  }
1008  for (size_t i = 0; i < idx; i++) {
1009  ret.push_back(sorted_cache[i]);
1010  }
1011 
1012  // For <= operator if the index that we have points to the element which is equal to
1013  // the pattern that we are searching for we want to include the element pointed by
1014  // the index in the result set. If the element pointed by the index is not equal to
1015  // the pattern we are comparing with we just want to include all the ids with index
1016  // less than the index that is cached, except when pattern that we are searching for
1017  // is smaller than the smallest string in the dictionary.
1018 
1019  } else if (comp_operator == "<=") {
1020  size_t idx = cache_index->index + 1;
1021  if (cache_index == 0 && cache_index->diff > 0) {
1022  idx = cache_index->index;
1023  }
1024  for (size_t i = 0; i < idx; i++) {
1025  ret.push_back(sorted_cache[i]);
1026  }
1027 
1028  // For > operator we want to get all the elements with index greater than the index
1029  // that we have except, when the pattern we are searching for is lesser than the
1030  // smallest string in the dictionary we also want to include the id of the index
1031  // that we have.
1032 
1033  } else if (comp_operator == ">") {
1034  size_t idx = cache_index->index + 1;
1035  if (cache_index->index == 0 && cache_index->diff > 0) {
1036  idx = cache_index->index;
1037  }
1038  for (size_t i = idx; i < sorted_cache.size(); i++) {
1039  ret.push_back(sorted_cache[i]);
1040  }
1041 
1042  // For >= operator when the indexed element that we have points to element which is
1043  // equal to the pattern we are searching for we want to include that in the result
1044  // vector. If the index that we have does not point to the string which is equal to
1045  // the pattern we are searching we don’t want to include that id into the result
1046  // vector except when the index is 0.
1047 
1048  } else if (comp_operator == ">=") {
1049  size_t idx = cache_index->index;
1050  if (cache_index->diff) {
1051  idx = cache_index->index + 1;
1052  if (cache_index->index == 0 && cache_index->diff > 0) {
1053  idx = cache_index->index;
1054  }
1055  }
1056  for (size_t i = idx; i < sorted_cache.size(); i++) {
1057  ret.push_back(sorted_cache[i]);
1058  }
1059  } else if (comp_operator == "=") {
1060  if (!cache_index->diff) {
1061  ret.push_back(sorted_cache[cache_index->index]);
1062  }
1063 
1064  // For <> operator it is simple matter of not including id of string which is equal
1065  // to pattern we are searching for.
1066  } else if (comp_operator == "<>") {
1067  if (!cache_index->diff) {
1068  size_t idx = cache_index->index;
1069  for (size_t i = 0; i < idx; i++) {
1070  ret.push_back(sorted_cache[i]);
1071  }
1072  ++idx;
1073  for (size_t i = idx; i < sorted_cache.size(); i++) {
1074  ret.push_back(sorted_cache[i]);
1075  }
1076  } else {
1077  for (size_t i = 0; i < sorted_cache.size(); i++) {
1078  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1079  }
1080  }
1081 
1082  } else {
1083  std::runtime_error("Unsupported string comparison operator");
1084  }
1085  return ret;
1086 }
bool isClient() const noexcept
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:330
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:302
constexpr double a
Definition: Utm.h:32
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

const shared::StringDictKey & StringDictionary::getDictKey ( ) const
noexcept

Definition at line 312 of file StringDictionary.cpp.

References dict_key_.

Referenced by RowSetMemoryOwner::addStringProxyIntersectionTranslationMap(), RowSetMemoryOwner::addStringProxyNumericTranslationMap(), RowSetMemoryOwner::addStringProxyUnionTranslationMap(), and buildDictionaryTranslationMap().

312  {
313  return dict_key_;
314 }
const shared::StringDictKey dict_key_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 877 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

879  {
880  std::vector<int32_t> result;
881  auto eq_id_itr = equal_cache_.find(pattern);
882  int32_t eq_id = MAX_STRLEN + 1;
883  int32_t cur_size = str_count_;
884  if (eq_id_itr != equal_cache_.end()) {
885  auto eq_id = eq_id_itr->second;
886  if (comp_operator == "=") {
887  result.push_back(eq_id);
888  } else {
889  for (int32_t idx = 0; idx <= cur_size; idx++) {
890  if (idx == eq_id) {
891  continue;
892  }
893  result.push_back(idx);
894  }
895  }
896  } else {
897  std::vector<std::thread> workers;
898  int worker_count = cpu_threads();
899  CHECK_GT(worker_count, 0);
900  std::vector<std::vector<int32_t>> worker_results(worker_count);
901  CHECK_LE(generation, str_count_);
902  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
903  workers.emplace_back(
904  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
905  for (size_t string_id = worker_idx; string_id < generation;
906  string_id += worker_count) {
907  const auto str = getStringUnlocked(string_id);
908  if (str == pattern) {
909  worker_results[worker_idx].push_back(string_id);
910  }
911  }
912  });
913  }
914  for (auto& worker : workers) {
915  worker.join();
916  }
917  for (const auto& worker_result : worker_results) {
918  result.insert(result.end(), worker_result.begin(), worker_result.end());
919  }
920  if (result.size() > 0) {
921  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
922  CHECK(it_ok.second);
923  eq_id = result[0];
924  }
925  if (comp_operator == "<>") {
926  for (int32_t idx = 0; idx <= cur_size; idx++) {
927  if (idx == eq_id) {
928  continue;
929  }
930  result.push_back(idx);
931  }
932  }
933  }
934  return result;
935 }
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template int32_t StringDictionary::getIdOfString ( const String &  ) const

Definition at line 744 of file StringDictionary.cpp.

References client_, getUnlocked(), isClient(), and rw_mutex_.

744  {
745  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
746  if (isClient()) {
747  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
748  return client_->get(str);
749  } else {
750  return client_->get(std::string(str));
751  }
752  }
753  return getUnlocked(str);
754 }
bool isClient() const noexcept
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
int32_t getUnlocked(const std::string_view sv) const noexcept

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 824 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), isClient(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

828  {
829  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
830  if (isClient()) {
831  return client_->get_like(pattern, icase, is_simple, escape, generation);
832  }
833  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
834  const auto it = like_cache_.find(cache_key);
835  if (it != like_cache_.end()) {
836  return it->second;
837  }
838  std::vector<int32_t> result;
839  std::vector<std::thread> workers;
840  int worker_count = cpu_threads();
841  CHECK_GT(worker_count, 0);
842  std::vector<std::vector<int32_t>> worker_results(worker_count);
843  CHECK_LE(generation, str_count_);
844  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
845  workers.emplace_back([&worker_results,
846  &pattern,
847  generation,
848  icase,
849  is_simple,
850  escape,
851  worker_idx,
852  worker_count,
853  this]() {
854  for (size_t string_id = worker_idx; string_id < generation;
855  string_id += worker_count) {
856  const auto str = getStringUnlocked(string_id);
857  if (is_like(str, pattern, icase, is_simple, escape)) {
858  worker_results[worker_idx].push_back(string_id);
859  }
860  }
861  });
862  }
863  for (auto& worker : workers) {
864  worker.join();
865  }
866  for (const auto& worker_result : worker_results) {
867  result.insert(result.end(), worker_result.begin(), worker_result.end());
868  }
869  // place result into cache for reuse if similar query
870  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
871 
872  CHECK(it_ok.second);
873 
874  return result;
875 }
bool isClient() const noexcept
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 323 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

324  {
325  if (storage_slots == 0) {
326  return 0;
327  }
328  // Must use signed integers since final binary search step can wrap to max size_t value
329  // if dictionary is empty
330  int64_t min_bound = 0;
331  int64_t max_bound = storage_slots - 1;
332  int64_t guess{0};
333  while (min_bound <= max_bound) {
334  guess = (max_bound + min_bound) / 2;
335  CHECK_GE(guess, 0);
336  if (getStringFromStorage(guess).canary) {
337  max_bound = guess - 1;
338  } else {
339  min_bound = guess + 1;
340  }
341  }
342  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
343  return guess + (min_bound > guess ? 1 : 0);
344 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 380 of file StringDictionary.cpp.

References CHECK_EQ.

380  {
381  if (isClient()) {
382  std::vector<int32_t> string_ids;
383  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
384  CHECK_EQ(size_t(1), string_ids.size());
385  return string_ids.front();
386  }
387  return getOrAddImpl(str);
388 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 590 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), dict_key_, fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

591  {
593  getOrAddBulkParallel(input_strings, output_string_ids);
594  return;
595  }
596  // Single-thread path.
597  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
598 
599  const size_t initial_str_count = str_count_;
600  size_t idx = 0;
601  for (const auto& input_string : input_strings) {
602  if (input_string.empty()) {
603  output_string_ids[idx++] = inline_int_null_value<T>();
604  continue;
605  }
606  CHECK(input_string.size() <= MAX_STRLEN);
607 
608  const string_dict_hash_t input_string_hash = hash_string(input_string);
609  uint32_t hash_bucket =
610  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
612  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
613  continue;
614  }
615  // need to add record to dictionary
616  // check there is room
617  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
618  throw_encoding_error<T>(input_string, dict_key_);
619  }
621  << "Maximum number (" << str_count_
622  << ") of Dictionary encoded Strings reached for this column, offset path "
623  "for column is "
624  << offsets_path_;
625  if (fillRateIsHigh(str_count_)) {
626  // resize when more than 50% is full
628  hash_bucket = computeBucket(
629  input_string_hash, input_string, string_id_string_dict_hash_table_);
630  }
631  appendToStorage(input_string);
632 
633  if (materialize_hashes_) {
634  hash_cache_[str_count_] = input_string_hash;
635  }
636  const int32_t string_id = static_cast<int32_t>(str_count_);
637  string_id_string_dict_hash_table_[hash_bucket] = string_id;
638  output_string_ids[idx++] = string_id;
639  ++str_count_;
640  }
641  const size_t num_strings_added = str_count_ - initial_str_count;
642  if (num_strings_added > 0) {
644  }
645 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
const shared::StringDictKey dict_key_
#define CHECK_LT(x, y)
Definition: Logger.h:303
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
bool g_enable_stringdict_parallel

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 439 of file StringDictionary.cpp.

References client_no_timeout_, and getOrAddBulk().

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

441  {
442  if (client_no_timeout_) {
443  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
444  return;
445  }
446 
447  ids_array_vec.resize(string_array_vec.size());
448  for (size_t i = 0; i < string_array_vec.size(); i++) {
449  auto& strings = string_array_vec[i];
450  auto& ids = ids_array_vec[i];
451  ids.resize(strings.size());
452  getOrAddBulk(strings, &ids[0]);
453  }
454 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 648 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, computeBucketFromStorageAndMemory(), dict_key_, fillRateIsHigh(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

649  {
650  // Compute hashes of the input strings up front, and in parallel,
651  // as the string hashing does not need to be behind the subsequent write_lock
652  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
653  hashStrings(input_strings, input_strings_hashes);
654 
655  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
656  size_t shadow_str_count =
657  str_count_; // Need to shadow str_count_ now with bulk add methods
658  const size_t storage_high_water_mark = shadow_str_count;
659  std::vector<size_t> string_memory_ids;
660  size_t sum_new_string_lengths = 0;
661  string_memory_ids.reserve(input_strings.size());
662  size_t input_string_idx{0};
663  for (const auto& input_string : input_strings) {
664  // Currently we make empty strings null
665  if (input_string.empty()) {
666  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
667  continue;
668  }
669  // TODO: Recover gracefully if an input string is too long
670  CHECK(input_string.size() <= MAX_STRLEN);
671 
672  if (fillRateIsHigh(shadow_str_count)) {
673  // resize when more than 50% is full
675  storage_high_water_mark,
676  input_strings,
677  string_memory_ids,
678  input_strings_hashes);
679  }
680  // Compute the hash for this input_string
681  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
682 
683  const uint32_t hash_bucket =
684  computeBucketFromStorageAndMemory(input_string_hash,
685  input_string,
687  storage_high_water_mark,
688  input_strings,
689  string_memory_ids);
690 
691  // If the hash bucket is not empty, that is our string id
692  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
693  // bucket string are equal)
695  output_string_ids[input_string_idx++] =
697  continue;
698  }
699  // Did not find string, so need to add record to dictionary
700  // First check there is room
701  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
702  throw_encoding_error<T>(input_string, dict_key_);
703  }
704  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
705  << "Maximum number (" << shadow_str_count
706  << ") of Dictionary encoded Strings reached for this column, offset path "
707  "for column is "
708  << offsets_path_;
709 
710  string_memory_ids.push_back(input_string_idx);
711  sum_new_string_lengths += input_string.size();
712  string_id_string_dict_hash_table_[hash_bucket] =
713  static_cast<int32_t>(shadow_str_count);
714  if (materialize_hashes_) {
715  hash_cache_[shadow_str_count] = input_string_hash;
716  }
717  output_string_ids[input_string_idx++] = shadow_str_count++;
718  }
719  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
720  const size_t num_strings_added = shadow_str_count - str_count_;
721  str_count_ = shadow_str_count;
722  if (num_strings_added > 0) {
724  }
725 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
const shared::StringDictKey dict_key_
#define CHECK_LT(x, y)
Definition: Logger.h:303
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1257 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1257  {
1258  // @TODO(wei) treat empty string as NULL for now
1259  if (str.size() == 0) {
1260  return inline_int_null_value<int32_t>();
1261  }
1262  CHECK(str.size() <= MAX_STRLEN);
1263  const string_dict_hash_t hash = hash_string(str);
1264  {
1265  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1266  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1268  return string_id_string_dict_hash_table_[bucket];
1269  }
1270  }
1271  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1272  if (fillRateIsHigh(str_count_)) {
1273  // resize when more than 50% is full
1275  }
1276  // need to recalculate the bucket in case it changed before
1277  // we got the lock
1278  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1281  << "Maximum number (" << str_count_
1282  << ") of Dictionary encoded Strings reached for this column, offset path "
1283  "for column is "
1284  << offsets_path_;
1285  appendToStorage(str);
1286  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1287  if (materialize_hashes_) {
1288  hash_cache_[str_count_] = hash;
1289  }
1290  ++str_count_;
1292  }
1293  return string_id_string_dict_hash_table_[bucket];
1294 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:291
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 1098 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), isClient(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

1100  {
1101  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1102  if (isClient()) {
1103  return client_->get_regexp_like(pattern, escape, generation);
1104  }
1105  const auto cache_key = std::make_pair(pattern, escape);
1106  const auto it = regex_cache_.find(cache_key);
1107  if (it != regex_cache_.end()) {
1108  return it->second;
1109  }
1110  std::vector<int32_t> result;
1111  std::vector<std::thread> workers;
1112  int worker_count = cpu_threads();
1113  CHECK_GT(worker_count, 0);
1114  std::vector<std::vector<int32_t>> worker_results(worker_count);
1115  CHECK_LE(generation, str_count_);
1116  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1117  workers.emplace_back([&worker_results,
1118  &pattern,
1119  generation,
1120  escape,
1121  worker_idx,
1122  worker_count,
1123  this]() {
1124  for (size_t string_id = worker_idx; string_id < generation;
1125  string_id += worker_count) {
1126  const auto str = getStringUnlocked(string_id);
1127  if (is_regexp_like(str, pattern, escape)) {
1128  worker_results[worker_idx].push_back(string_id);
1129  }
1130  }
1131  });
1132  }
1133  for (auto& worker : workers) {
1134  worker.join();
1135  }
1136  for (const auto& worker_result : worker_results) {
1137  result.insert(result.end(), worker_result.begin(), worker_result.end());
1138  }
1139  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1140  CHECK(it_ok.second);
1141 
1142  return result;
1143 }
bool isClient() const noexcept
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 766 of file StringDictionary.cpp.

References client_, getStringUnlocked(), isClient(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

766  {
767  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
768  if (isClient()) {
769  std::string ret;
770  client_->get_string(ret, string_id);
771  return ret;
772  }
773  return getStringUnlocked(string_id);
774 }
bool isClient() const noexcept
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 781 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

782  {
783  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
784  CHECK(!isClient());
785  CHECK_LE(0, string_id);
786  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
787  return getStringBytesChecked(string_id);
788 }
bool isClient() const noexcept
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:303
#define CHECK_LE(x, y)
Definition: Logger.h:304
#define CHECK(condition)
Definition: Logger.h:291
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1302 of file StringDictionary.cpp.

References CHECK.

1303  {
1304  const auto str_canary = getStringFromStorage(string_id);
1305  CHECK(!str_canary.canary);
1306  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1307 }
#define CHECK(condition)
Definition: Logger.h:291
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1296 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1296  {
1297  const auto str_canary = getStringFromStorage(string_id);
1298  CHECK(!str_canary.canary);
1299  return std::string(str_canary.c_str_ptr, str_canary.size);
1300 }
#define CHECK(condition)
Definition: Logger.h:291
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1488 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1489  {
1490  if (!isTemp_) {
1491  CHECK_GE(payload_fd_, 0);
1492  CHECK_GE(offset_fd_, 0);
1493  }
1494  CHECK_GE(string_id, 0);
1495  const StringIdxEntry* str_meta = offset_map_ + string_id;
1496  if (str_meta->size == 0xffff) {
1497  // hit the canary
1498  return {nullptr, 0, true};
1499  }
1500  return {payload_map_ + str_meta->off, str_meta->size, false};
1501 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:306

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1482 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by buildDictionaryNumericTranslationMap(), buildDictionaryTranslationMap(), eachStringSerially(), and getStringViews().

1483  {
1484  const StringIdxEntry* str_meta = offset_map_ + string_id;
1485  return {payload_map_ + str_meta->off, str_meta->size};
1486 }
StringIdxEntry * offset_map_

+ Here is the caller graph for this function:

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 776 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

776  {
777  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
778  return getStringChecked(string_id);
779 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:303

+ Here is the caller graph for this function:

std::vector< std::string_view > StringDictionary::getStringViews ( ) const

Definition at line 1770 of file StringDictionary.cpp.

References storageEntryCount().

1770  {
1772 }
size_t storageEntryCount() const
std::vector< std::string_view > getStringViews() const

+ Here is the call graph for this function:

std::vector< std::string_view > StringDictionary::getStringViews ( const size_t  generation) const

Definition at line 1716 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, DEBUG_TIMER, getStringFromStorageFast(), MAX_STRCOUNT, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, and storageEntryCount().

1717  {
1718  auto timer = DEBUG_TIMER(__func__);
1719  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1720  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1721  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1722  // The CHECK_LE below is currently redundant with the check
1723  // above against MAX_STRCOUNT, however given we iterate using
1724  // int32_t types for efficiency (to match type expected by
1725  // getStringFromStorageFast, check that the # of strings is also
1726  // in the int32_t range in case MAX_STRCOUNT is changed
1727 
1728  // Todo(todd): consider aliasing the max logical type width
1729  // (currently int32_t) throughout StringDictionary
1730  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1731 
1732  std::vector<std::string_view> string_views(num_strings);
1733  // We can bail early if the generation-specified dictionary is empty
1734  if (num_strings == 0) {
1735  return string_views;
1736  }
1737  constexpr int64_t tbb_parallel_threshold{1000};
1738  if (num_strings < tbb_parallel_threshold) {
1739  // Use int32_t to match type expected by getStringFromStorageFast
1740  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1741  string_views[string_idx] = getStringFromStorageFast(string_idx);
1742  }
1743  } else {
1744  constexpr int64_t target_strings_per_thread{1000};
1745  const ThreadInfo thread_info(
1746  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1747  CHECK_GE(thread_info.num_threads, 1L);
1748  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1749 
1750  tbb::task_arena limited_arena(thread_info.num_threads);
1751  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1752  limited_arena.execute([&] {
1754  tbb::blocked_range<int64_t>(
1755  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1756  [&](const tbb::blocked_range<int64_t>& r) {
1757  // r should be in range of int32_t per CHECK above
1758  const int32_t start_idx = r.begin();
1759  const int32_t end_idx = r.end();
1760  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1761  string_views[string_idx] = getStringFromStorageFast(string_idx);
1762  }
1763  },
1764  tbb::simple_partitioner());
1765  });
1766  }
1767  return string_views;
1768 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:306
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr size_t MAX_STRCOUNT
#define CHECK_LE(x, y)
Definition: Logger.h:304
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define DEBUG_TIMER(name)
Definition: Logger.h:411

+ Here is the call graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string_view  sv) const
privatenoexcept

Definition at line 759 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

759  {
760  const string_dict_hash_t hash = hash_string(sv);
763  return str_id;
764 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 470 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and threading_serial::parallel_for().

Referenced by getOrAddBulkParallel().

472  {
473  CHECK_EQ(string_vec.size(), hashes.size());
474 
475  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
476  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
477  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
478  if (string_vec[curr_id].empty()) {
479  continue;
480  }
481  hashes[curr_id] = hash_string(string_vec[curr_id]);
482  }
483  });
484 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
string_dict_hash_t hash_string(const std::string_view &str)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1201 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1201  {
1202  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1203  INVALID_STR_ID);
1204 
1205  if (materialize_hashes_) {
1206  for (size_t i = 0; i != str_count_; ++i) {
1207  const string_dict_hash_t hash = hash_cache_[i];
1208  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1209  new_str_ids[bucket] = i;
1210  }
1211  hash_cache_.resize(hash_cache_.size() * 2);
1212  } else {
1213  for (size_t i = 0; i != str_count_; ++i) {
1214  const auto str = getStringChecked(i);
1215  const string_dict_hash_t hash = hash_string(str);
1216  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1217  new_str_ids[bucket] = i;
1218  }
1219  }
1220  string_id_string_dict_hash_table_.swap(new_str_ids);
1221 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1224 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getOrAddBulkParallel().

1230  {
1231  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1232  INVALID_STR_ID);
1233  if (materialize_hashes_) {
1234  for (size_t i = 0; i != str_count; ++i) {
1235  const string_dict_hash_t hash = hash_cache_[i];
1236  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1237  new_str_ids[bucket] = i;
1238  }
1239  hash_cache_.resize(hash_cache_.size() * 2);
1240  } else {
1241  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1242  const auto storage_string = getStringChecked(storage_idx);
1243  const string_dict_hash_t hash = hash_string(storage_string);
1244  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1245  new_str_ids[bucket] = storage_idx;
1246  }
1247  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1248  const size_t string_memory_id = string_memory_ids[memory_idx];
1249  const uint32_t bucket = computeUniqueBucketWithHash(
1250  input_strings_hashes[string_memory_id], new_str_ids);
1251  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1252  }
1253  }
1254  string_id_string_dict_hash_table_.swap(new_str_ids);
1255 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1563 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, regex_cache_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1563  {
1564  if (!like_cache_.empty()) {
1565  decltype(like_cache_)().swap(like_cache_);
1566  }
1567  if (!regex_cache_.empty()) {
1568  decltype(regex_cache_)().swap(regex_cache_);
1569  }
1570  if (!equal_cache_.empty()) {
1571  decltype(equal_cache_)().swap(equal_cache_);
1572  }
1573  compare_cache_.invalidateInvertedIndex();
1574 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::isClient ( ) const
noexcept

Definition at line 1599 of file StringDictionary.cpp.

References client_.

Referenced by checkpoint(), copyStrings(), eachStringSerially(), getCompare(), getIdOfString(), getLike(), getRegexpLike(), getString(), makeLambdaStringToId(), storageEntryCount(), and ~StringDictionary().

1599  {
1600  return static_cast<bool>(client_);
1601 }
std::unique_ptr< StringDictionaryClient > client_

+ Here is the caller graph for this function:

std::function< int32_t(std::string const &)> StringDictionary::makeLambdaStringToId ( ) const

Definition at line 255 of file StringDictionary.cpp.

References CHECK, eachStringSerially(), INVALID_STR_ID, and isClient().

256  {
257  CHECK(isClient());
258  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
259  MapMaker map_maker;
260  eachStringSerially(big_gen, map_maker);
261  return [map{map_maker.moveMap()}](std::string const& str) {
262  auto const itr = map.find(str);
263  return itr == map.cend() ? INVALID_STR_ID : itr->second;
264  };
265 }
bool isClient() const noexcept
static constexpr int32_t INVALID_STR_ID
void eachStringSerially(int64_t const generation, StringCallback &) const
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1627 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1627  {
1628  // this method is not thread safe
1629  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1630  size_t t_idx = 0, s_idx = 0, idx = 0;
1631  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1632  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1633  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1634  const auto insert_from_temp_cache =
1635  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1636  if (insert_from_temp_cache) {
1637  updated_cache[idx] = temp_sorted_cache[t_idx++];
1638  } else {
1639  updated_cache[idx] = sorted_cache[s_idx++];
1640  }
1641  }
1642  while (t_idx < temp_sorted_cache.size()) {
1643  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1644  }
1645  while (s_idx < sorted_cache.size()) {
1646  updated_cache[idx++] = sorted_cache[s_idx++];
1647  }
1648  sorted_cache.swap(updated_cache);
1649 }
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:302
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1675 of file StringDictionary.cpp.

References threading_serial::async(), populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1679  {
1680  dest_array_ids.resize(source_array_ids.size());
1681 
1682  std::atomic<size_t> row_idx{0};
1683  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1684  int thread_id) {
1685  for (;;) {
1686  auto row = row_idx.fetch_add(1);
1687 
1688  if (row >= dest_array_ids.size()) {
1689  return;
1690  }
1691  const auto& source_ids = source_array_ids[row];
1692  auto& dest_ids = dest_array_ids[row];
1693  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1694  }
1695  };
1696 
1697  const int num_worker_threads = std::thread::hardware_concurrency();
1698 
1699  if (source_array_ids.size() / num_worker_threads > 10) {
1700  std::vector<std::future<void>> worker_threads;
1701  for (int i = 0; i < num_worker_threads; ++i) {
1702  worker_threads.push_back(std::async(std::launch::async, processor, i));
1703  }
1704 
1705  for (auto& child : worker_threads) {
1706  child.wait();
1707  }
1708  for (auto& child : worker_threads) {
1709  child.get();
1710  }
1711  } else {
1712  processor(0);
1713  }
1714 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
future< Result > async(Fn &&fn, Args &&...args)
ThreadId thread_id()
Definition: Logger.cpp:871

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::vector< std::string const * > &  transient_string_vec = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_string_vec- ordered vector of string value pointers

Definition at line 1651 of file StringDictionary.cpp.

References CHECK_LT, getOrAddBulk(), getString(), and StringDictionaryProxy::transientIdToIndex().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1656  {
1657  std::vector<std::string> strings;
1658 
1659  for (const int32_t source_id : source_ids) {
1660  if (source_id == std::numeric_limits<int32_t>::min()) {
1661  strings.emplace_back("");
1662  } else if (source_id < 0) {
1663  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1664  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1665  strings.emplace_back(*transient_string_vec[string_index]);
1666  } else {
1667  strings.push_back(source_dict->getString(source_id));
1668  }
1669  }
1670 
1671  dest_ids.resize(strings.size());
1672  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1673 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::string getString(int32_t string_id) const
static unsigned transientIdToIndex(int32_t const id)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 292 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

294  {
295  for (auto& dictionary_future : dictionary_futures) {
296  dictionary_future.wait();
297  const auto hashVec = dictionary_future.get();
298  for (const auto& hash : hashVec) {
299  const uint32_t bucket =
301  payload_file_off_ += hash.second;
302  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
303  if (materialize_hashes_) {
304  hash_cache_[str_count_] = hash.first;
305  }
306  ++str_count_;
307  }
308  }
309  dictionary_futures.clear();
310 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1614 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1614  {
1615  // This method is not thread-safe.
1616 
1617  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1618  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1619 
1620  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1621  auto a_str = this->getStringFromStorage(a);
1622  auto b_str = this->getStringFromStorage(b);
1623  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1624  });
1625 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:302
constexpr double a
Definition: Utm.h:32
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 790 of file StringDictionary.cpp.

References client_, isClient(), rw_mutex_, and str_count_.

Referenced by buildDictionaryTranslationMap(), eachStringSerially(), getBulk(), and getStringViews().

790  {
791  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
792  if (isClient()) {
793  return client_->storage_entry_count();
794  }
795  return str_count_;
796 }
bool isClient() const noexcept
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::update_leaf ( const LeafHostInfo host_info)

Definition at line 378 of file StringDictionary.cpp.

378 {}

Friends And Related Function Documentation

friend class StringLocalCallback
friend

Definition at line 79 of file StringDictionary.h.

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 291 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 292 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
mutableprivate
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
mutableprivate

Definition at line 289 of file StringDictionary.h.

Referenced by buildDictionaryTranslationMap(), and getOrAddBulkArray().

size_t StringDictionary::collisions_
private

Definition at line 267 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 286 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

const shared::StringDictKey StringDictionary::dict_key_
private

Definition at line 264 of file StringDictionary.h.

Referenced by getBulk(), getDictKey(), getOrAddBulk(), and getOrAddBulkParallel().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 285 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

const std::string StringDictionary::folder_
private

Definition at line 265 of file StringDictionary.h.

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 283 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static
int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 273 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 284 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 270 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 287 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: