OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
class  StringCallback
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const DictRef &dict_ref, const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getDbId () const noexcept
 
int32_t getDictId () const noexcept
 
void eachStringSerially (int64_t const generation, StringCallback &) const
 
std::function< int32_t(std::string
const &)> 
makeLambdaStringToId () const
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec) const
 
template<class T , class String >
size_t getBulk (const std::vector< String > &string_vec, T *encoded_vec, const int64_t generation) const
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
template<class String >
int32_t getIdOfString (const String &) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::vector< std::string > copyStrings () const
 
std::vector< std::string_view > getStringViews () const
 
std::vector< std::string_view > getStringViews (const size_t generation) const
 
std::vector< int32_t > buildDictionaryTranslationMap (const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
 
size_t buildDictionaryTranslationMap (const StringDictionary *dest_dict, int32_t *translated_ids, const int64_t source_generation, const int64_t dest_generation, const bool dest_has_transients, StringLookupCallback const &dest_transient_lookup_callback, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
 
bool checkpoint () noexcept
 
bool isClient () const noexcept
 
void update_leaf (const LeafHostInfo &host_info)
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
int32_t getUnlocked (const std::string_view sv) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

const DictRef dict_ref_
 
const std::string folder_
 
size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
std::shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Friends

class StringLocalCallback
 

Detailed Description

Definition at line 54 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const DictRef dict_ref,
const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 117 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), threading_serial::async(), CHECK_EQ, heavyai::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, heavyai::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

123  : dict_ref_(dict_ref)
124  , folder_(folder)
125  , str_count_(0)
127  , hash_cache_(initial_capacity)
128  , isTemp_(isTemp)
129  , materialize_hashes_(materializeHashes)
130  , payload_fd_(-1)
131  , offset_fd_(-1)
132  , offset_map_(nullptr)
133  , payload_map_(nullptr)
134  , offset_file_size_(0)
135  , payload_file_size_(0)
136  , payload_file_off_(0)
137  , strings_cache_(nullptr) {
138  if (!isTemp && folder.empty()) {
139  return;
140  }
141 
142  // initial capacity must be a power of two for efficient bucket computation
143  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
144  if (!isTemp_) {
145  boost::filesystem::path storage_path(folder);
146  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
147  const auto payload_path =
148  (storage_path / boost::filesystem::path("DictPayload")).string();
149  payload_fd_ = checked_open(payload_path.c_str(), recover);
150  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
153  }
154  bool storage_is_empty = false;
155  if (payload_file_size_ == 0) {
156  storage_is_empty = true;
158  }
159  if (offset_file_size_ == 0) {
161  }
162  if (!isTemp_) { // we never mmap or recover temp dictionaries
163  payload_map_ =
164  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
165  offset_map_ = reinterpret_cast<StringIdxEntry*>(
167  if (recover) {
168  const size_t bytes = heavyai::file_size(offset_fd_);
169  if (bytes % sizeof(StringIdxEntry) != 0) {
170  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
171  }
172  const uint64_t str_count =
173  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
174  collisions_ = 0;
175  // at this point we know the size of the StringDict we need to load
176  // so lets reallocate the vector to the correct size
177  const uint64_t max_entries =
178  std::max(round_up_p2(str_count * 2 + 1),
179  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
180  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
181  string_id_string_dict_hash_table_.swap(new_str_ids);
182  if (materialize_hashes_) {
183  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
184  hash_cache_.swap(new_hash_cache);
185  }
186  // Bail early if we know we don't have strings to add (i.e. a new or empty
187  // dictionary)
188  if (str_count == 0) {
189  return;
190  }
191 
192  unsigned string_id = 0;
193  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
194 
195  uint32_t thread_inits = 0;
196  const auto thread_count = std::thread::hardware_concurrency();
197  const uint32_t items_per_thread = std::max<uint32_t>(
198  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
199  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
200  dictionary_futures;
201  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
202  dictionary_futures.emplace_back(std::async(
203  std::launch::async, [string_id, str_count, items_per_thread, this] {
204  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
205  for (uint32_t curr_id = string_id;
206  curr_id < string_id + items_per_thread && curr_id < str_count;
207  curr_id++) {
208  const auto recovered = getStringFromStorage(curr_id);
209  if (recovered.canary) {
210  // hit the canary, recovery finished
211  break;
212  } else {
213  std::string_view temp(recovered.c_str_ptr, recovered.size);
214  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
215  }
216  }
217  return hashVec;
218  }));
219  thread_inits++;
220  if (thread_inits % thread_count == 0) {
221  processDictionaryFutures(dictionary_futures);
222  }
223  }
224  // gather last few threads
225  if (dictionary_futures.size() != 0) {
226  processDictionaryFutures(dictionary_futures);
227  }
228  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
229  << " Hash table size: " << string_id_string_dict_hash_table_.size()
230  << " Fill rate: "
231  << static_cast<double>(str_count_) * 100.0 /
233  << "% Collisions: " << collisions_;
234  }
235  }
236 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:231
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:217
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
const DictRef dict_ref_
std::shared_mutex rw_mutex_
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
#define VLOG(n)
Definition: Logger.h:317
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 349 of file StringDictionary.cpp.

350  : dict_ref_(dict_ref)
351  , folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
352  std::to_string(dict_ref.dictId))
353  , strings_cache_(nullptr)
354  , client_(new StringDictionaryClient(host, dict_ref, true))
355  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
const DictRef dict_ref_
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
int32_t dictId
Definition: DictRef.h:11
std::unique_ptr< StringDictionaryClient > client_no_timeout_
int32_t dbId
Definition: DictRef.h:10
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 357 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, heavyai::checked_munmap(), heavyai::close(), isClient(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

357  {
358  free(CANARY_BUFFER);
359  if (isClient()) {
360  return;
361  }
362  if (payload_map_) {
363  if (!isTemp_) {
367  CHECK_GE(payload_fd_, 0);
369  CHECK_GE(offset_fd_, 0);
371  } else {
373  free(payload_map_);
374  free(offset_map_);
375  }
376  }
377 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:236
#define CHECK(condition)
Definition: Logger.h:223
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
void close(const int fd)
Definition: omnisci_fs.cpp:68

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1543 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1545  {
1546  const size_t canary_buff_size_to_add =
1547  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1548  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1549  if (canary_buffer_size < canary_buff_size_to_add) {
1550  CANARY_BUFFER =
1551  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1552  canary_buffer_size = canary_buff_size_to_add;
1554  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1555  }
1556  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1557  CHECK(new_addr);
1558  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1559  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1560  mem_size += canary_buff_size_to_add;
1561  return new_addr;
1562 }
#define CHECK(condition)
Definition: Logger.h:223
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1513 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1513  {
1514  if (!isTemp_) {
1515  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1516  } else {
1517  offset_map_ = static_cast<StringIdxEntry*>(
1518  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1519  }
1520 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1504 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1504  {
1505  if (!isTemp_) {
1506  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1507  } else {
1508  payload_map_ = static_cast<char*>(
1509  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1510  }
1511 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1522 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1524  {
1525  const size_t canary_buff_size_to_add =
1526  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1527  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1528 
1529  if (canary_buffer_size < canary_buff_size_to_add) {
1530  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1531  canary_buffer_size = canary_buff_size_to_add;
1533  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1534  }
1535 
1536  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1537  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1538  CHECK(write_return > 0 &&
1539  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1540  return canary_buff_size_to_add;
1541 }
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
#define CHECK_NE(x, y)
Definition: Logger.h:232
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1449 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1449  {
1450  // write the payload
1452  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1453 
1454  // write the offset and length
1455  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1456  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1457 
1459  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1460 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1463 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1466  {
1467  const size_t num_strings = string_memory_ids.size();
1468 
1469  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1470  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1471 
1472  for (size_t i = 0; i < num_strings; ++i) {
1473  const size_t string_idx = string_memory_ids[i];
1474  const String str = input_strings[string_idx];
1475  const size_t str_size(str.size());
1476  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1477  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1478  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1479  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1480  }
1481 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
std::vector< int32_t > StringDictionary::buildDictionaryTranslationMap ( const std::shared_ptr< StringDictionary dest_dict,
StringLookupCallback const &  dest_transient_lookup_callback 
) const

Definition at line 1775 of file StringDictionary.cpp.

References DEBUG_TIMER, and storageEntryCount().

1777  {
1778  auto timer = DEBUG_TIMER(__func__);
1779  const size_t num_source_strings = storageEntryCount();
1780  const size_t num_dest_strings = dest_dict->storageEntryCount();
1781  std::vector<int32_t> translated_ids(num_source_strings);
1782 
1783  buildDictionaryTranslationMap(dest_dict.get(),
1784  translated_ids.data(),
1785  num_source_strings,
1786  num_dest_strings,
1787  true, // Just assume true for dest_has_transients as this
1788  // function is only used for testing currently
1789  dest_transient_lookup_callback,
1790  {});
1791  return translated_ids;
1792 }
std::vector< int32_t > buildDictionaryTranslationMap(const std::shared_ptr< StringDictionary > dest_dict, StringLookupCallback const &dest_transient_lookup_callback) const
size_t storageEntryCount() const
#define DEBUG_TIMER(name)
Definition: Logger.h:370

+ Here is the call graph for this function:

size_t StringDictionary::buildDictionaryTranslationMap ( const StringDictionary dest_dict,
int32_t *  translated_ids,
const int64_t  source_generation,
const int64_t  dest_generation,
const bool  dest_has_transients,
StringLookupCallback const &  dest_transient_lookup_callback,
const std::vector< StringOps_Namespace::StringOpInfo > &  string_op_infos 
) const

Definition at line 1817 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, client_no_timeout_, computeBucket(), DEBUG_TIMER, getDbId(), getDictId(), getStringFromStorageFast(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, order_translation_locks(), threading_serial::parallel_for(), rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

1824  {
1825  auto timer = DEBUG_TIMER(__func__);
1826  CHECK_GE(source_generation, 0L);
1827  CHECK_GE(dest_generation, 0L);
1828  const int64_t num_source_strings = source_generation;
1829  const int64_t num_dest_strings = dest_generation;
1830 
1831  // We can bail early if there are no source strings to translate
1832  if (num_source_strings == 0L) {
1833  return 0;
1834  }
1835 
1836  // If here we should should have local dictionaries.
1837  // Note case of transient source dictionaries that aren't
1838  // seen as remote (they have no client_no_timeout_) is covered
1839  // by early bail above on num_source_strings == 0
1840  if (dest_dict->client_no_timeout_) {
1841  throw std::runtime_error(
1842  "Cannot translate between a local source and remote destination dictionary.");
1843  }
1844 
1845  // Sort this/source dict and dest dict on folder_ so we can enforce
1846  // lock ordering and avoid deadlocks
1847  std::shared_lock<std::shared_mutex> source_read_lock(rw_mutex_, std::defer_lock);
1848  std::shared_lock<std::shared_mutex> dest_read_lock(dest_dict->rw_mutex_,
1849  std::defer_lock);
1851  getDictId(),
1852  dest_dict->getDbId(),
1853  dest_dict->getDictId(),
1854  source_read_lock,
1855  dest_read_lock);
1856 
1857  // For both source and destination dictionaries we cap the max
1858  // entries to be translated/translated to at the supplied
1859  // generation arguments, if valid (i.e. >= 0), otherwise just the
1860  // size of each dictionary
1861 
1862  CHECK_LE(num_source_strings, static_cast<int64_t>(str_count_));
1863  CHECK_LE(num_dest_strings, static_cast<int64_t>(dest_dict->str_count_));
1864  const bool dest_dictionary_is_empty = (num_dest_strings == 0);
1865 
1866  constexpr int64_t target_strings_per_thread{1000};
1867  const ThreadInfo thread_info(
1868  std::thread::hardware_concurrency(), num_source_strings, target_strings_per_thread);
1869  CHECK_GE(thread_info.num_threads, 1L);
1870  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1871 
1872  // We use a tbb::task_arena to cap the number of threads, has been
1873  // in other contexts been shown to exhibit better performance when low
1874  // numbers of threads are needed than just letting tbb figure the number of threads,
1875  // but should benchmark in this specific context
1876 
1877  const StringOps_Namespace::StringOps string_ops(string_op_infos);
1878  const bool has_string_ops = string_ops.size();
1879 
1880  tbb::task_arena limited_arena(thread_info.num_threads);
1881  std::vector<size_t> num_strings_not_translated_per_thread(thread_info.num_threads, 0UL);
1882  constexpr bool short_circuit_empty_dictionary_translations{false};
1883  limited_arena.execute([&] {
1884  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1885  if (short_circuit_empty_dictionary_translations && dest_dictionary_is_empty) {
1887  tbb::blocked_range<int32_t>(
1888  0,
1889  num_source_strings,
1890  thread_info.num_elems_per_thread /* tbb grain_size */),
1891  [&](const tbb::blocked_range<int32_t>& r) {
1892  const int32_t start_idx = r.begin();
1893  const int32_t end_idx = r.end();
1894  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1895  translated_ids[string_idx] = INVALID_STR_ID;
1896  }
1897  },
1898  tbb::simple_partitioner());
1899  num_strings_not_translated_per_thread[0] += num_source_strings;
1900  } else {
1901  // The below logic, by executing low-level private variable accesses on both
1902  // dictionaries, is less clean than a previous variant that simply called
1903  // `getStringViews` from the source dictionary and then called `getBulk` on the
1904  // destination dictionary, but this version gets significantly better performance
1905  // (~2X), likely due to eliminating the overhead of writing out the string views and
1906  // then reading them back in (along with the associated cache misses)
1908  tbb::blocked_range<int32_t>(
1909  0,
1910  num_source_strings,
1911  thread_info.num_elems_per_thread /* tbb grain_size */),
1912  [&](const tbb::blocked_range<int32_t>& r) {
1913  const int32_t start_idx = r.begin();
1914  const int32_t end_idx = r.end();
1915  size_t num_strings_not_translated = 0;
1916  std::string string_ops_storage; // Needs to be thread local to back
1917  // string_view returned by string_ops()
1918  for (int32_t source_string_id = start_idx; source_string_id != end_idx;
1919  ++source_string_id) {
1920  const std::string_view source_str =
1921  has_string_ops ? string_ops(getStringFromStorageFast(source_string_id),
1922  string_ops_storage)
1923  : getStringFromStorageFast(source_string_id);
1924 
1925  if (source_str.empty()) {
1926  translated_ids[source_string_id] = inline_int_null_value<int32_t>();
1927  continue;
1928  }
1929  // Get the hash from this/the source dictionary's cache, as the function
1930  // will be the same for the dest_dict, sparing us having to recompute it
1931 
1932  // Todo(todd): Remove option to turn string hash cache off or at least
1933  // make a constexpr to avoid these branches when we expect it to be always
1934  // on going forward
1935  const string_dict_hash_t hash = (materialize_hashes_ && !has_string_ops)
1936  ? hash_cache_[source_string_id]
1937  : hash_string(source_str);
1938  const uint32_t hash_bucket = dest_dict->computeBucket(
1939  hash, source_str, dest_dict->string_id_string_dict_hash_table_);
1940  const auto translated_string_id =
1941  dest_dict->string_id_string_dict_hash_table_[hash_bucket];
1942  translated_ids[source_string_id] = translated_string_id;
1943 
1944  if (translated_string_id == StringDictionary::INVALID_STR_ID ||
1945  translated_string_id >= num_dest_strings) {
1946  if (dest_has_transients) {
1947  num_strings_not_translated +=
1948  dest_transient_lookup_callback(source_str, source_string_id);
1949  } else {
1950  num_strings_not_translated++;
1951  }
1952  continue;
1953  }
1954  }
1955  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
1956  num_strings_not_translated_per_thread[tbb_thread_idx] +=
1957  num_strings_not_translated;
1958  },
1959  tbb::simple_partitioner());
1960  }
1961  });
1962  size_t total_num_strings_not_translated = 0;
1963  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
1964  total_num_strings_not_translated += num_strings_not_translated_per_thread[thread_idx];
1965  }
1966  return total_num_strings_not_translated;
1967 }
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
#define CHECK_GE(x, y)
Definition: Logger.h:236
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void order_translation_locks(const int32_t source_db_id, const int32_t source_dict_id, const int32_t dest_db_id, const int32_t dest_dict_id, std::shared_lock< std::shared_mutex > &source_read_lock, std::shared_lock< std::shared_mutex > &dest_read_lock)
#define CHECK_LE(x, y)
Definition: Logger.h:234
int32_t getDictId() const noexcept
int32_t getDbId() const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
uint32_t string_dict_hash_t
#define DEBUG_TIMER(name)
Definition: Logger.h:370
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

void StringDictionary::buildSortedCache ( )
private

Definition at line 1604 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1604  {
1605  // This method is not thread-safe.
1606  const auto cur_cache_size = sorted_cache.size();
1607  std::vector<int32_t> temp_sorted_cache;
1608  for (size_t i = cur_cache_size; i < str_count_; i++) {
1609  temp_sorted_cache.push_back(i);
1610  }
1611  sortCache(temp_sorted_cache);
1612  mergeSortedCache(temp_sorted_cache);
1613 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1428 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1429  {
1430  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1431  if (offset_file_off + write_length >= offset_file_size_) {
1432  const size_t min_capacity_needed =
1433  write_length - (offset_file_size_ - offset_file_off);
1434  if (!isTemp_) {
1435  CHECK_GE(offset_fd_, 0);
1437  addOffsetCapacity(min_capacity_needed);
1438  CHECK(offset_file_off + write_length <= offset_file_size_);
1439  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1441  } else {
1442  addOffsetCapacity(min_capacity_needed);
1443  CHECK(offset_file_off + write_length <= offset_file_size_);
1444  }
1445  }
1446 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:236
#define CHECK(condition)
Definition: Logger.h:223
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1409 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, heavyai::checked_mmap(), heavyai::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1410  {
1411  if (payload_file_off_ + write_length > payload_file_size_) {
1412  const size_t min_capacity_needed =
1413  write_length - (payload_file_size_ - payload_file_off_);
1414  if (!isTemp_) {
1415  CHECK_GE(payload_fd_, 0);
1417  addPayloadCapacity(min_capacity_needed);
1418  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1419  payload_map_ =
1420  reinterpret_cast<char*>(heavyai::checked_mmap(payload_fd_, payload_file_size_));
1421  } else {
1422  addPayloadCapacity(min_capacity_needed);
1423  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1424  }
1425  }
1426 }
#define CHECK_GE(x, y)
Definition: Logger.h:236
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK(condition)
Definition: Logger.h:223
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1581 of file StringDictionary.cpp.

References CHECK, client_, heavyai::fsync(), isClient(), isTemp_, heavyai::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1581  {
1582  if (isClient()) {
1583  try {
1584  return client_->checkpoint();
1585  } catch (...) {
1586  return false;
1587  }
1588  }
1589  CHECK(!isTemp_);
1590  bool ret = true;
1591  ret = ret &&
1592  (heavyai::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1593  ret = ret &&
1594  (heavyai::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1595  ret = ret && (heavyai::fsync(offset_fd_) == 0);
1596  ret = ret && (heavyai::fsync(payload_fd_) == 0);
1597  return ret;
1598 }
StringIdxEntry * offset_map_
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
int fsync(int fd)
Definition: omnisci_fs.cpp:60
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1311 of file StringDictionary.cpp.

Referenced by buildDictionaryTranslationMap(), getBulk(), and getOrAddBulk().

1314  {
1315  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1316  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1317  while (true) {
1318  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1319  if (candidate_string_id ==
1320  INVALID_STR_ID) { // In this case it means the slot is available for use
1321  break;
1322  }
1323  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1325  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1326  if (input_string.size() == candidate_string.size() &&
1327  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1328  // found the string
1329  break;
1330  }
1331  }
1332  // wrap around
1333  if (++bucket == string_dict_hash_table_size) {
1334  bucket = 0;
1335  }
1336  }
1337  return bucket;
1338 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1341 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1347  {
1348  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1349  while (true) {
1350  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1351  if (candidate_string_id ==
1352  INVALID_STR_ID) { // In this case it means the slot is available for use
1353  break;
1354  }
1355  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1356  if (candidate_string_id > 0 &&
1357  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1358  // The candidate string is not in storage yet but in our string_memory_ids temp
1359  // buffer
1360  size_t memory_offset =
1361  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1362  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1363  if (input_string.size() == candidate_string.size() &&
1364  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1365  // found the string in the temp memory buffer
1366  break;
1367  }
1368  } else {
1369  // The candidate string is in storage, need to fetch it for comparison
1370  const auto candidate_storage_string =
1371  getStringFromStorageFast(candidate_string_id);
1372  if (input_string.size() == candidate_storage_string.size() &&
1373  !memcmp(input_string.data(),
1374  candidate_storage_string.data(),
1375  input_string.size())) {
1378  // found the string in storage
1379  break;
1380  }
1381  }
1382  }
1383  if (++bucket == string_id_string_dict_hash_table.size()) {
1384  bucket = 0;
1385  }
1386  }
1387  return bucket;
1388 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1390 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1392  {
1393  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1394  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1395  while (true) {
1396  if (string_id_string_dict_hash_table[bucket] ==
1397  INVALID_STR_ID) { // In this case it means the slot is available for use
1398  break;
1399  }
1400  collisions_++;
1401  // wrap around
1402  if (++bucket == string_dict_hash_table_size) {
1403  bucket = 0;
1404  }
1405  }
1406  return bucket;
1407 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::vector< std::string > StringDictionary::copyStrings ( ) const

Definition at line 1146 of file StringDictionary.cpp.

References threading_serial::async(), CHECK_EQ, CHECK_GT, CHECK_LE, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), isClient(), rw_mutex_, str_count_, and strings_cache_.

1146  {
1147  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1148  if (isClient()) {
1149  // TODO(miyu): support remote string dictionary
1150  throw std::runtime_error(
1151  "copying dictionaries from remote server is not supported yet.");
1152  }
1153 
1154  if (strings_cache_) {
1155  return *strings_cache_;
1156  }
1157 
1158  strings_cache_ = std::make_shared<std::vector<std::string>>();
1159  strings_cache_->reserve(str_count_);
1160  const bool multithreaded = str_count_ > 10000;
1161  const auto worker_count =
1162  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1163  CHECK_GT(worker_count, 0UL);
1164  std::vector<std::vector<std::string>> worker_results(worker_count);
1165  auto copy = [this](std::vector<std::string>& str_list,
1166  const size_t start_id,
1167  const size_t end_id) {
1168  CHECK_LE(start_id, end_id);
1169  str_list.reserve(end_id - start_id);
1170  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1171  str_list.push_back(getStringUnlocked(string_id));
1172  }
1173  };
1174  if (multithreaded) {
1175  std::vector<std::future<void>> workers;
1176  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1177  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1178  worker_idx < worker_count && start < str_count_;
1179  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1180  workers.push_back(std::async(
1181  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
1182  }
1183  for (auto& worker : workers) {
1184  worker.get();
1185  }
1186  } else {
1187  CHECK_EQ(worker_results.size(), size_t(1));
1188  copy(worker_results[0], 0, str_count_);
1189  }
1190 
1191  for (const auto& worker_result : worker_results) {
1192  strings_cache_->insert(
1193  strings_cache_->end(), worker_result.begin(), worker_result.end());
1194  }
1195  return *strings_cache_;
1196 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string getStringUnlocked(int32_t string_id) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
#define CHECK_LE(x, y)
Definition: Logger.h:234
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

void StringDictionary::eachStringSerially ( int64_t const  generation,
StringCallback serial_callback 
) const

Definition at line 267 of file StringDictionary.cpp.

References CHECK_LE, client_, getStringFromStorageFast(), isClient(), anonymous_namespace{Utm.h}::n, rw_mutex_, storageEntryCount(), and str_count_.

Referenced by makeLambdaStringToId().

268  {
269  if (isClient()) {
270  // copyStrings() is not supported when isClient().
271  std::string str; // Import buffer. Placing outside of loop should reduce allocations.
272  size_t const n = std::min(static_cast<size_t>(generation), storageEntryCount());
273  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
274  for (unsigned id = 0; id < n; ++id) {
275  {
276  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
277  client_->get_string(str, id);
278  }
279  serial_callback(str, id);
280  }
281  } else {
282  size_t const n = std::min(static_cast<size_t>(generation), str_count_);
283  CHECK_LE(n, static_cast<size_t>(std::numeric_limits<int32_t>::max()) + 1);
284  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
285  for (unsigned id = 0; id < n; ++id) {
286  serial_callback(getStringFromStorageFast(static_cast<int>(id)), id);
287  }
288  }
289 }
bool isClient() const noexcept
size_t storageEntryCount() const
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
#define CHECK_LE(x, y)
Definition: Logger.h:234
mapd_shared_lock< mapd_shared_mutex > read_lock
constexpr double n
Definition: Utm.h:38

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1198 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1198  {
1199  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1200 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
) const

Definition at line 488 of file StringDictionary.cpp.

489  {
490  return getBulk(string_vec, encoded_vec, -1L /* generation */);
491 }
size_t getBulk(const std::vector< String > &string_vec, T *encoded_vec) const
template<class T , class String >
size_t StringDictionary::getBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec,
const int64_t  generation 
) const

Definition at line 501 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, computeBucket(), dict_ref_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, MAX_STRLEN, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, storageEntryCount(), string_id_string_dict_hash_table_, and anonymous_namespace{StringDictionary.cpp}::throw_string_too_long_error().

503  {
504  constexpr int64_t target_strings_per_thread{1000};
505  const int64_t num_lookup_strings = string_vec.size();
506  if (num_lookup_strings == 0) {
507  return 0;
508  }
509 
510  const ThreadInfo thread_info(
511  std::thread::hardware_concurrency(), num_lookup_strings, target_strings_per_thread);
512  CHECK_GE(thread_info.num_threads, 1L);
513  CHECK_GE(thread_info.num_elems_per_thread, 1L);
514 
515  std::vector<size_t> num_strings_not_found_per_thread(thread_info.num_threads, 0UL);
516 
517  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
518  const int64_t num_dict_strings = generation >= 0 ? generation : storageEntryCount();
519  const bool dictionary_is_empty = (num_dict_strings == 0);
520  if (dictionary_is_empty) {
521  tbb::parallel_for(tbb::blocked_range<int64_t>(0, num_lookup_strings),
522  [&](const tbb::blocked_range<int64_t>& r) {
523  const int64_t start_idx = r.begin();
524  const int64_t end_idx = r.end();
525  for (int64_t string_idx = start_idx; string_idx < end_idx;
526  ++string_idx) {
527  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
528  }
529  });
530  return num_lookup_strings;
531  }
532  // If we're here the generation-capped dictionary has strings in it
533  // that we need to look up against
534 
535  tbb::task_arena limited_arena(thread_info.num_threads);
536  limited_arena.execute([&] {
537  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
539  tbb::blocked_range<int64_t>(
540  0, num_lookup_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
541  [&](const tbb::blocked_range<int64_t>& r) {
542  const int64_t start_idx = r.begin();
543  const int64_t end_idx = r.end();
544  size_t num_strings_not_found = 0;
545  for (int64_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
546  const auto& input_string = string_vec[string_idx];
547  if (input_string.empty()) {
548  encoded_vec[string_idx] = inline_int_null_value<T>();
549  continue;
550  }
551  if (input_string.size() > StringDictionary::MAX_STRLEN) {
552  throw_string_too_long_error(input_string, dict_ref_);
553  }
554  const string_dict_hash_t input_string_hash = hash_string(input_string);
555  uint32_t hash_bucket = computeBucket(
556  input_string_hash, input_string, string_id_string_dict_hash_table_);
557  // Will either be legit id or INVALID_STR_ID
558  const auto string_id = string_id_string_dict_hash_table_[hash_bucket];
559  if (string_id == StringDictionary::INVALID_STR_ID ||
560  string_id >= num_dict_strings) {
561  encoded_vec[string_idx] = StringDictionary::INVALID_STR_ID;
562  num_strings_not_found++;
563  continue;
564  }
565  encoded_vec[string_idx] = string_id;
566  }
567  const size_t tbb_thread_idx = tbb::this_task_arena::current_thread_index();
568  num_strings_not_found_per_thread[tbb_thread_idx] = num_strings_not_found;
569  },
570  tbb::simple_partitioner());
571  });
572 
573  size_t num_strings_not_found = 0;
574  for (int64_t thread_idx = 0; thread_idx < thread_info.num_threads; ++thread_idx) {
575  num_strings_not_found += num_strings_not_found_per_thread[thread_idx];
576  }
577  return num_strings_not_found;
578 }
string_dict_hash_t hash_string(const std::string_view &str)
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:236
const DictRef dict_ref_
std::shared_mutex rw_mutex_
void throw_string_too_long_error(std::string_view str, const DictRef &dict_ref)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
#define CHECK_LE(x, y)
Definition: Logger.h:234
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
mapd_shared_lock< mapd_shared_mutex > read_lock
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 938 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), isClient(), gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

940  {
941  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
942  if (isClient()) {
943  return client_->get_compare(pattern, comp_operator, generation);
944  }
945  std::vector<int32_t> ret;
946  if (str_count_ == 0) {
947  return ret;
948  }
949  if (sorted_cache.size() < str_count_) {
950  if (comp_operator == "=" || comp_operator == "<>") {
951  return getEquals(pattern, comp_operator, generation);
952  }
953 
955  }
956  auto cache_index = compare_cache_.get(pattern);
957 
958  if (!cache_index) {
959  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
960  const auto cache_itr = std::lower_bound(
961  sorted_cache.begin(),
962  sorted_cache.end(),
963  pattern,
964  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
965  auto a_str = this->getStringFromStorage(a);
966  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
967  });
968 
969  if (cache_itr == sorted_cache.end()) {
970  cache_index->index = sorted_cache.size() - 1;
971  cache_index->diff = 1;
972  } else {
973  const auto cache_str = getStringFromStorage(*cache_itr);
974  if (!string_eq(
975  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
976  cache_index->index = cache_itr - sorted_cache.begin() - 1;
977  cache_index->diff = 1;
978  } else {
979  cache_index->index = cache_itr - sorted_cache.begin();
980  cache_index->diff = 0;
981  }
982  }
983 
984  compare_cache_.put(pattern, cache_index);
985  }
986 
987  // since we have a cache in form of vector of ints which is sorted according to
988  // corresponding strings in the dictionary all we need is the index of the element
989  // which equal to the pattern that we are trying to match or the index of “biggest”
990  // element smaller than the pattern, to perform all the comparison operators over
991  // string. The search function guarantees we have such index so now it is just the
992  // matter to include all the elements in the result vector.
993 
994  // For < operator if the index that we have points to the element which is equal to
995  // the pattern that we are searching for we simply get all the elements less than the
996  // index. If the element pointed by the index is not equal to the pattern we are
997  // comparing with we also need to include that index in result vector, except when the
998  // index points to 0 and the pattern is lesser than the smallest value in the string
999  // dictionary.
1000 
1001  if (comp_operator == "<") {
1002  size_t idx = cache_index->index;
1003  if (cache_index->diff) {
1004  idx = cache_index->index + 1;
1005  if (cache_index->index == 0 && cache_index->diff > 0) {
1006  idx = cache_index->index;
1007  }
1008  }
1009  for (size_t i = 0; i < idx; i++) {
1010  ret.push_back(sorted_cache[i]);
1011  }
1012 
1013  // For <= operator if the index that we have points to the element which is equal to
1014  // the pattern that we are searching for we want to include the element pointed by
1015  // the index in the result set. If the element pointed by the index is not equal to
1016  // the pattern we are comparing with we just want to include all the ids with index
1017  // less than the index that is cached, except when pattern that we are searching for
1018  // is smaller than the smallest string in the dictionary.
1019 
1020  } else if (comp_operator == "<=") {
1021  size_t idx = cache_index->index + 1;
1022  if (cache_index == 0 && cache_index->diff > 0) {
1023  idx = cache_index->index;
1024  }
1025  for (size_t i = 0; i < idx; i++) {
1026  ret.push_back(sorted_cache[i]);
1027  }
1028 
1029  // For > operator we want to get all the elements with index greater than the index
1030  // that we have except, when the pattern we are searching for is lesser than the
1031  // smallest string in the dictionary we also want to include the id of the index
1032  // that we have.
1033 
1034  } else if (comp_operator == ">") {
1035  size_t idx = cache_index->index + 1;
1036  if (cache_index->index == 0 && cache_index->diff > 0) {
1037  idx = cache_index->index;
1038  }
1039  for (size_t i = idx; i < sorted_cache.size(); i++) {
1040  ret.push_back(sorted_cache[i]);
1041  }
1042 
1043  // For >= operator when the indexed element that we have points to element which is
1044  // equal to the pattern we are searching for we want to include that in the result
1045  // vector. If the index that we have does not point to the string which is equal to
1046  // the pattern we are searching we don’t want to include that id into the result
1047  // vector except when the index is 0.
1048 
1049  } else if (comp_operator == ">=") {
1050  size_t idx = cache_index->index;
1051  if (cache_index->diff) {
1052  idx = cache_index->index + 1;
1053  if (cache_index->index == 0 && cache_index->diff > 0) {
1054  idx = cache_index->index;
1055  }
1056  }
1057  for (size_t i = idx; i < sorted_cache.size(); i++) {
1058  ret.push_back(sorted_cache[i]);
1059  }
1060  } else if (comp_operator == "=") {
1061  if (!cache_index->diff) {
1062  ret.push_back(sorted_cache[cache_index->index]);
1063  }
1064 
1065  // For <> operator it is simple matter of not including id of string which is equal
1066  // to pattern we are searching for.
1067  } else if (comp_operator == "<>") {
1068  if (!cache_index->diff) {
1069  size_t idx = cache_index->index;
1070  for (size_t i = 0; i < idx; i++) {
1071  ret.push_back(sorted_cache[i]);
1072  }
1073  ++idx;
1074  for (size_t i = idx; i < sorted_cache.size(); i++) {
1075  ret.push_back(sorted_cache[i]);
1076  }
1077  } else {
1078  for (size_t i = 0; i < sorted_cache.size(); i++) {
1079  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
1080  }
1081  }
1082 
1083  } else {
1084  std::runtime_error("Unsupported string comparison operator");
1085  }
1086  return ret;
1087 }
bool isClient() const noexcept
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
constexpr double a
Definition: Utm.h:32
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

int32_t StringDictionary::getDbId ( ) const
noexcept

Definition at line 311 of file StringDictionary.cpp.

References dict_ref_t::dbId, and dict_ref_.

Referenced by buildDictionaryTranslationMap().

311  {
312  return dict_ref_.dbId;
313 }
const DictRef dict_ref_
int32_t dbId
Definition: DictRef.h:10

+ Here is the caller graph for this function:

int32_t StringDictionary::getDictId ( ) const
noexcept

Definition at line 315 of file StringDictionary.cpp.

References dict_ref_, and dict_ref_t::dictId.

Referenced by RowSetMemoryOwner::addStringProxyIntersectionTranslationMap(), RowSetMemoryOwner::addStringProxyUnionTranslationMap(), and buildDictionaryTranslationMap().

315  {
316  return dict_ref_.dictId;
317 }
const DictRef dict_ref_
int32_t dictId
Definition: DictRef.h:11

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 878 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

880  {
881  std::vector<int32_t> result;
882  auto eq_id_itr = equal_cache_.find(pattern);
883  int32_t eq_id = MAX_STRLEN + 1;
884  int32_t cur_size = str_count_;
885  if (eq_id_itr != equal_cache_.end()) {
886  auto eq_id = eq_id_itr->second;
887  if (comp_operator == "=") {
888  result.push_back(eq_id);
889  } else {
890  for (int32_t idx = 0; idx <= cur_size; idx++) {
891  if (idx == eq_id) {
892  continue;
893  }
894  result.push_back(idx);
895  }
896  }
897  } else {
898  std::vector<std::thread> workers;
899  int worker_count = cpu_threads();
900  CHECK_GT(worker_count, 0);
901  std::vector<std::vector<int32_t>> worker_results(worker_count);
902  CHECK_LE(generation, str_count_);
903  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
904  workers.emplace_back(
905  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
906  for (size_t string_id = worker_idx; string_id < generation;
907  string_id += worker_count) {
908  const auto str = getStringUnlocked(string_id);
909  if (str == pattern) {
910  worker_results[worker_idx].push_back(string_id);
911  }
912  }
913  });
914  }
915  for (auto& worker : workers) {
916  worker.join();
917  }
918  for (const auto& worker_result : worker_results) {
919  result.insert(result.end(), worker_result.begin(), worker_result.end());
920  }
921  if (result.size() > 0) {
922  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
923  CHECK(it_ok.second);
924  eq_id = result[0];
925  }
926  if (comp_operator == "<>") {
927  for (int32_t idx = 0; idx <= cur_size; idx++) {
928  if (idx == eq_id) {
929  continue;
930  }
931  result.push_back(idx);
932  }
933  }
934  }
935  return result;
936 }
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:234
#define CHECK(condition)
Definition: Logger.h:223
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template int32_t StringDictionary::getIdOfString ( const String &  ) const

Definition at line 745 of file StringDictionary.cpp.

References client_, getUnlocked(), isClient(), and rw_mutex_.

745  {
746  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
747  if (isClient()) {
748  if constexpr (std::is_same_v<std::string, std::decay_t<String>>) {
749  return client_->get(str);
750  } else {
751  return client_->get(std::string(str));
752  }
753  }
754  return getUnlocked(str);
755 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
int32_t getUnlocked(const std::string_view sv) const noexcept
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 825 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), isClient(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

829  {
830  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
831  if (isClient()) {
832  return client_->get_like(pattern, icase, is_simple, escape, generation);
833  }
834  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
835  const auto it = like_cache_.find(cache_key);
836  if (it != like_cache_.end()) {
837  return it->second;
838  }
839  std::vector<int32_t> result;
840  std::vector<std::thread> workers;
841  int worker_count = cpu_threads();
842  CHECK_GT(worker_count, 0);
843  std::vector<std::vector<int32_t>> worker_results(worker_count);
844  CHECK_LE(generation, str_count_);
845  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
846  workers.emplace_back([&worker_results,
847  &pattern,
848  generation,
849  icase,
850  is_simple,
851  escape,
852  worker_idx,
853  worker_count,
854  this]() {
855  for (size_t string_id = worker_idx; string_id < generation;
856  string_id += worker_count) {
857  const auto str = getStringUnlocked(string_id);
858  if (is_like(str, pattern, icase, is_simple, escape)) {
859  worker_results[worker_idx].push_back(string_id);
860  }
861  }
862  });
863  }
864  for (auto& worker : workers) {
865  worker.join();
866  }
867  for (const auto& worker_result : worker_results) {
868  result.insert(result.end(), worker_result.begin(), worker_result.end());
869  }
870  // place result into cache for reuse if similar query
871  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
872 
873  CHECK(it_ok.second);
874 
875  return result;
876 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:234
#define CHECK(condition)
Definition: Logger.h:223
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 326 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

327  {
328  if (storage_slots == 0) {
329  return 0;
330  }
331  // Must use signed integers since final binary search step can wrap to max size_t value
332  // if dictionary is empty
333  int64_t min_bound = 0;
334  int64_t max_bound = storage_slots - 1;
335  int64_t guess{0};
336  while (min_bound <= max_bound) {
337  guess = (max_bound + min_bound) / 2;
338  CHECK_GE(guess, 0);
339  if (getStringFromStorage(guess).canary) {
340  max_bound = guess - 1;
341  } else {
342  min_bound = guess + 1;
343  }
344  }
345  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
346  return guess + (min_bound > guess ? 1 : 0);
347 }
#define CHECK_GE(x, y)
Definition: Logger.h:236
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 382 of file StringDictionary.cpp.

References CHECK_EQ.

382  {
383  if (isClient()) {
384  std::vector<int32_t> string_ids;
385  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
386  CHECK_EQ(size_t(1), string_ids.size());
387  return string_ids.front();
388  }
389  return getOrAddImpl(str);
390 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
bool isClient() const noexcept
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 591 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), dict_ref_, fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

592  {
594  getOrAddBulkParallel(input_strings, output_string_ids);
595  return;
596  }
597  // Single-thread path.
598  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
599 
600  const size_t initial_str_count = str_count_;
601  size_t idx = 0;
602  for (const auto& input_string : input_strings) {
603  if (input_string.empty()) {
604  output_string_ids[idx++] = inline_int_null_value<T>();
605  continue;
606  }
607  CHECK(input_string.size() <= MAX_STRLEN);
608 
609  const string_dict_hash_t input_string_hash = hash_string(input_string);
610  uint32_t hash_bucket =
611  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
613  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
614  continue;
615  }
616  // need to add record to dictionary
617  // check there is room
618  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
619  throw_encoding_error<T>(input_string, dict_ref_);
620  }
622  << "Maximum number (" << str_count_
623  << ") of Dictionary encoded Strings reached for this column, offset path "
624  "for column is "
625  << offsets_path_;
626  if (fillRateIsHigh(str_count_)) {
627  // resize when more than 50% is full
629  hash_bucket = computeBucket(
630  input_string_hash, input_string, string_id_string_dict_hash_table_);
631  }
632  appendToStorage(input_string);
633 
634  if (materialize_hashes_) {
635  hash_cache_[str_count_] = input_string_hash;
636  }
637  const int32_t string_id = static_cast<int32_t>(str_count_);
638  string_id_string_dict_hash_table_[hash_bucket] = string_id;
639  output_string_ids[idx++] = string_id;
640  ++str_count_;
641  }
642  const size_t num_strings_added = str_count_ - initial_str_count;
643  if (num_strings_added > 0) {
645  }
646 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
const DictRef dict_ref_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:233
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:223
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 440 of file StringDictionary.cpp.

References client_no_timeout_, and getOrAddBulk().

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

442  {
443  if (client_no_timeout_) {
444  client_no_timeout_->get_or_add_bulk_array(ids_array_vec, string_array_vec);
445  return;
446  }
447 
448  ids_array_vec.resize(string_array_vec.size());
449  for (size_t i = 0; i < string_array_vec.size(); i++) {
450  auto& strings = string_array_vec[i];
451  auto& ids = ids_array_vec[i];
452  ids.resize(strings.size());
453  getOrAddBulk(strings, &ids[0]);
454  }
455 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 649 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, computeBucketFromStorageAndMemory(), dict_ref_, fillRateIsHigh(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

650  {
651  // Compute hashes of the input strings up front, and in parallel,
652  // as the string hashing does not need to be behind the subsequent write_lock
653  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
654  hashStrings(input_strings, input_strings_hashes);
655 
656  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
657  size_t shadow_str_count =
658  str_count_; // Need to shadow str_count_ now with bulk add methods
659  const size_t storage_high_water_mark = shadow_str_count;
660  std::vector<size_t> string_memory_ids;
661  size_t sum_new_string_lengths = 0;
662  string_memory_ids.reserve(input_strings.size());
663  size_t input_string_idx{0};
664  for (const auto& input_string : input_strings) {
665  // Currently we make empty strings null
666  if (input_string.empty()) {
667  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
668  continue;
669  }
670  // TODO: Recover gracefully if an input string is too long
671  CHECK(input_string.size() <= MAX_STRLEN);
672 
673  if (fillRateIsHigh(shadow_str_count)) {
674  // resize when more than 50% is full
676  storage_high_water_mark,
677  input_strings,
678  string_memory_ids,
679  input_strings_hashes);
680  }
681  // Compute the hash for this input_string
682  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
683 
684  const uint32_t hash_bucket =
685  computeBucketFromStorageAndMemory(input_string_hash,
686  input_string,
688  storage_high_water_mark,
689  input_strings,
690  string_memory_ids);
691 
692  // If the hash bucket is not empty, that is our string id
693  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
694  // bucket string are equal)
696  output_string_ids[input_string_idx++] =
698  continue;
699  }
700  // Did not find string, so need to add record to dictionary
701  // First check there is room
702  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
703  throw_encoding_error<T>(input_string, dict_ref_);
704  }
705  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
706  << "Maximum number (" << shadow_str_count
707  << ") of Dictionary encoded Strings reached for this column, offset path "
708  "for column is "
709  << offsets_path_;
710 
711  string_memory_ids.push_back(input_string_idx);
712  sum_new_string_lengths += input_string.size();
713  string_id_string_dict_hash_table_[hash_bucket] =
714  static_cast<int32_t>(shadow_str_count);
715  if (materialize_hashes_) {
716  hash_cache_[shadow_str_count] = input_string_hash;
717  }
718  output_string_ids[input_string_idx++] = shadow_str_count++;
719  }
720  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
721  const size_t num_strings_added = shadow_str_count - str_count_;
722  str_count_ = shadow_str_count;
723  if (num_strings_added > 0) {
725  }
726 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
const DictRef dict_ref_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:233
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:223
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1258 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1258  {
1259  // @TODO(wei) treat empty string as NULL for now
1260  if (str.size() == 0) {
1261  return inline_int_null_value<int32_t>();
1262  }
1263  CHECK(str.size() <= MAX_STRLEN);
1264  const string_dict_hash_t hash = hash_string(str);
1265  {
1266  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1267  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1269  return string_id_string_dict_hash_table_[bucket];
1270  }
1271  }
1272  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1273  if (fillRateIsHigh(str_count_)) {
1274  // resize when more than 50% is full
1276  }
1277  // need to recalculate the bucket in case it changed before
1278  // we got the lock
1279  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1282  << "Maximum number (" << str_count_
1283  << ") of Dictionary encoded Strings reached for this column, offset path "
1284  "for column is "
1285  << offsets_path_;
1286  appendToStorage(str);
1287  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1288  if (materialize_hashes_) {
1289  hash_cache_[str_count_] = hash;
1290  }
1291  ++str_count_;
1293  }
1294  return string_id_string_dict_hash_table_[bucket];
1295 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
std::shared_mutex rw_mutex_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:233
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:223
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 1099 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), isClient(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

1101  {
1102  std::lock_guard<std::shared_mutex> write_lock(rw_mutex_);
1103  if (isClient()) {
1104  return client_->get_regexp_like(pattern, escape, generation);
1105  }
1106  const auto cache_key = std::make_pair(pattern, escape);
1107  const auto it = regex_cache_.find(cache_key);
1108  if (it != regex_cache_.end()) {
1109  return it->second;
1110  }
1111  std::vector<int32_t> result;
1112  std::vector<std::thread> workers;
1113  int worker_count = cpu_threads();
1114  CHECK_GT(worker_count, 0);
1115  std::vector<std::vector<int32_t>> worker_results(worker_count);
1116  CHECK_LE(generation, str_count_);
1117  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
1118  workers.emplace_back([&worker_results,
1119  &pattern,
1120  generation,
1121  escape,
1122  worker_idx,
1123  worker_count,
1124  this]() {
1125  for (size_t string_id = worker_idx; string_id < generation;
1126  string_id += worker_count) {
1127  const auto str = getStringUnlocked(string_id);
1128  if (is_regexp_like(str, pattern, escape)) {
1129  worker_results[worker_idx].push_back(string_id);
1130  }
1131  }
1132  });
1133  }
1134  for (auto& worker : workers) {
1135  worker.join();
1136  }
1137  for (const auto& worker_result : worker_results) {
1138  result.insert(result.end(), worker_result.begin(), worker_result.end());
1139  }
1140  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
1141  CHECK(it_ok.second);
1142 
1143  return result;
1144 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:234
#define CHECK(condition)
Definition: Logger.h:223
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 767 of file StringDictionary.cpp.

References client_, getStringUnlocked(), isClient(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

767  {
768  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
769  if (isClient()) {
770  std::string ret;
771  client_->get_string(ret, string_id);
772  return ret;
773  }
774  return getStringUnlocked(string_id);
775 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 782 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

783  {
784  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
785  CHECK(!isClient());
786  CHECK_LE(0, string_id);
787  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
788  return getStringBytesChecked(string_id);
789 }
bool isClient() const noexcept
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:233
#define CHECK_LE(x, y)
Definition: Logger.h:234
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:223
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1303 of file StringDictionary.cpp.

References CHECK.

1304  {
1305  const auto str_canary = getStringFromStorage(string_id);
1306  CHECK(!str_canary.canary);
1307  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1308 }
#define CHECK(condition)
Definition: Logger.h:223
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1297 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1297  {
1298  const auto str_canary = getStringFromStorage(string_id);
1299  CHECK(!str_canary.canary);
1300  return std::string(str_canary.c_str_ptr, str_canary.size);
1301 }
#define CHECK(condition)
Definition: Logger.h:223
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1489 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1490  {
1491  if (!isTemp_) {
1492  CHECK_GE(payload_fd_, 0);
1493  CHECK_GE(offset_fd_, 0);
1494  }
1495  CHECK_GE(string_id, 0);
1496  const StringIdxEntry* str_meta = offset_map_ + string_id;
1497  if (str_meta->size == 0xffff) {
1498  // hit the canary
1499  return {nullptr, 0, true};
1500  }
1501  return {payload_map_ + str_meta->off, str_meta->size, false};
1502 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:236

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1483 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by buildDictionaryTranslationMap(), eachStringSerially(), and getStringViews().

1484  {
1485  const StringIdxEntry* str_meta = offset_map_ + string_id;
1486  return {payload_map_ + str_meta->off, str_meta->size};
1487 }
StringIdxEntry * offset_map_

+ Here is the caller graph for this function:

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 777 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

777  {
778  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
779  return getStringChecked(string_id);
780 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:233

+ Here is the caller graph for this function:

std::vector< std::string_view > StringDictionary::getStringViews ( ) const

Definition at line 1771 of file StringDictionary.cpp.

References storageEntryCount().

1771  {
1773 }
size_t storageEntryCount() const
std::vector< std::string_view > getStringViews() const

+ Here is the call graph for this function:

std::vector< std::string_view > StringDictionary::getStringViews ( const size_t  generation) const

Definition at line 1717 of file StringDictionary.cpp.

References CHECK_GE, CHECK_LE, DEBUG_TIMER, getStringFromStorageFast(), MAX_STRCOUNT, ThreadInfo::num_elems_per_thread, ThreadInfo::num_threads, threading_serial::parallel_for(), rw_mutex_, and storageEntryCount().

1718  {
1719  auto timer = DEBUG_TIMER(__func__);
1720  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
1721  const int64_t num_strings = generation >= 0 ? generation : storageEntryCount();
1722  CHECK_LE(num_strings, static_cast<int64_t>(StringDictionary::MAX_STRCOUNT));
1723  // The CHECK_LE below is currently redundant with the check
1724  // above against MAX_STRCOUNT, however given we iterate using
1725  // int32_t types for efficiency (to match type expected by
1726  // getStringFromStorageFast, check that the # of strings is also
1727  // in the int32_t range in case MAX_STRCOUNT is changed
1728 
1729  // Todo(todd): consider aliasing the max logical type width
1730  // (currently int32_t) throughout StringDictionary
1731  CHECK_LE(num_strings, std::numeric_limits<int32_t>::max());
1732 
1733  std::vector<std::string_view> string_views(num_strings);
1734  // We can bail early if the generation-specified dictionary is empty
1735  if (num_strings == 0) {
1736  return string_views;
1737  }
1738  constexpr int64_t tbb_parallel_threshold{1000};
1739  if (num_strings < tbb_parallel_threshold) {
1740  // Use int32_t to match type expected by getStringFromStorageFast
1741  for (int32_t string_idx = 0; string_idx < num_strings; ++string_idx) {
1742  string_views[string_idx] = getStringFromStorageFast(string_idx);
1743  }
1744  } else {
1745  constexpr int64_t target_strings_per_thread{1000};
1746  const ThreadInfo thread_info(
1747  std::thread::hardware_concurrency(), num_strings, target_strings_per_thread);
1748  CHECK_GE(thread_info.num_threads, 1L);
1749  CHECK_GE(thread_info.num_elems_per_thread, 1L);
1750 
1751  tbb::task_arena limited_arena(thread_info.num_threads);
1752  CHECK_LE(tbb::this_task_arena::max_concurrency(), thread_info.num_threads);
1753  limited_arena.execute([&] {
1755  tbb::blocked_range<int64_t>(
1756  0, num_strings, thread_info.num_elems_per_thread /* tbb grain_size */),
1757  [&](const tbb::blocked_range<int64_t>& r) {
1758  // r should be in range of int32_t per CHECK above
1759  const int32_t start_idx = r.begin();
1760  const int32_t end_idx = r.end();
1761  for (int32_t string_idx = start_idx; string_idx != end_idx; ++string_idx) {
1762  string_views[string_idx] = getStringFromStorageFast(string_idx);
1763  }
1764  },
1765  tbb::simple_partitioner());
1766  });
1767  }
1768  return string_views;
1769 }
size_t storageEntryCount() const
#define CHECK_GE(x, y)
Definition: Logger.h:236
std::shared_mutex rw_mutex_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr size_t MAX_STRCOUNT
#define CHECK_LE(x, y)
Definition: Logger.h:234
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
mapd_shared_lock< mapd_shared_mutex > read_lock
#define DEBUG_TIMER(name)
Definition: Logger.h:370

+ Here is the call graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string_view  sv) const
privatenoexcept

Definition at line 760 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

760  {
761  const string_dict_hash_t hash = hash_string(sv);
764  return str_id;
765 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 471 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and threading_serial::parallel_for().

Referenced by getOrAddBulkParallel().

473  {
474  CHECK_EQ(string_vec.size(), hashes.size());
475 
476  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
477  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
478  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
479  if (string_vec[curr_id].empty()) {
480  continue;
481  }
482  hashes[curr_id] = hash_string(string_vec[curr_id]);
483  }
484  });
485 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
string_dict_hash_t hash_string(const std::string_view &str)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1202 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1202  {
1203  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1204  INVALID_STR_ID);
1205 
1206  if (materialize_hashes_) {
1207  for (size_t i = 0; i != str_count_; ++i) {
1208  const string_dict_hash_t hash = hash_cache_[i];
1209  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1210  new_str_ids[bucket] = i;
1211  }
1212  hash_cache_.resize(hash_cache_.size() * 2);
1213  } else {
1214  for (size_t i = 0; i != str_count_; ++i) {
1215  const auto str = getStringChecked(i);
1216  const string_dict_hash_t hash = hash_string(str);
1217  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1218  new_str_ids[bucket] = i;
1219  }
1220  }
1221  string_id_string_dict_hash_table_.swap(new_str_ids);
1222 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1225 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getOrAddBulkParallel().

1231  {
1232  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1233  INVALID_STR_ID);
1234  if (materialize_hashes_) {
1235  for (size_t i = 0; i != str_count; ++i) {
1236  const string_dict_hash_t hash = hash_cache_[i];
1237  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1238  new_str_ids[bucket] = i;
1239  }
1240  hash_cache_.resize(hash_cache_.size() * 2);
1241  } else {
1242  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1243  const auto storage_string = getStringChecked(storage_idx);
1244  const string_dict_hash_t hash = hash_string(storage_string);
1245  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1246  new_str_ids[bucket] = storage_idx;
1247  }
1248  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1249  const size_t string_memory_id = string_memory_ids[memory_idx];
1250  const uint32_t bucket = computeUniqueBucketWithHash(
1251  input_strings_hashes[string_memory_id], new_str_ids);
1252  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1253  }
1254  }
1255  string_id_string_dict_hash_table_.swap(new_str_ids);
1256 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1564 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, regex_cache_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1564  {
1565  if (!like_cache_.empty()) {
1566  decltype(like_cache_)().swap(like_cache_);
1567  }
1568  if (!regex_cache_.empty()) {
1569  decltype(regex_cache_)().swap(regex_cache_);
1570  }
1571  if (!equal_cache_.empty()) {
1572  decltype(equal_cache_)().swap(equal_cache_);
1573  }
1574  compare_cache_.invalidateInvertedIndex();
1575 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::isClient ( ) const
noexcept

Definition at line 1600 of file StringDictionary.cpp.

References client_.

Referenced by checkpoint(), copyStrings(), eachStringSerially(), getCompare(), getIdOfString(), getLike(), getRegexpLike(), getString(), makeLambdaStringToId(), storageEntryCount(), and ~StringDictionary().

1600  {
1601  return static_cast<bool>(client_);
1602 }
std::unique_ptr< StringDictionaryClient > client_

+ Here is the caller graph for this function:

std::function< int32_t(std::string const &)> StringDictionary::makeLambdaStringToId ( ) const

Definition at line 254 of file StringDictionary.cpp.

References CHECK, eachStringSerially(), INVALID_STR_ID, and isClient().

255  {
256  CHECK(isClient());
257  constexpr size_t big_gen = static_cast<size_t>(std::numeric_limits<size_t>::max());
258  MapMaker map_maker;
259  eachStringSerially(big_gen, map_maker);
260  return [map{map_maker.moveMap()}](std::string const& str) {
261  auto const itr = map.find(str);
262  return itr == map.cend() ? INVALID_STR_ID : itr->second;
263  };
264 }
bool isClient() const noexcept
static constexpr int32_t INVALID_STR_ID
void eachStringSerially(int64_t const generation, StringCallback &) const
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1628 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1628  {
1629  // this method is not thread safe
1630  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1631  size_t t_idx = 0, s_idx = 0, idx = 0;
1632  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1633  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1634  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1635  const auto insert_from_temp_cache =
1636  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1637  if (insert_from_temp_cache) {
1638  updated_cache[idx] = temp_sorted_cache[t_idx++];
1639  } else {
1640  updated_cache[idx] = sorted_cache[s_idx++];
1641  }
1642  }
1643  while (t_idx < temp_sorted_cache.size()) {
1644  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1645  }
1646  while (s_idx < sorted_cache.size()) {
1647  updated_cache[idx++] = sorted_cache[s_idx++];
1648  }
1649  sorted_cache.swap(updated_cache);
1650 }
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1676 of file StringDictionary.cpp.

References threading_serial::async(), populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1680  {
1681  dest_array_ids.resize(source_array_ids.size());
1682 
1683  std::atomic<size_t> row_idx{0};
1684  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1685  int thread_id) {
1686  for (;;) {
1687  auto row = row_idx.fetch_add(1);
1688 
1689  if (row >= dest_array_ids.size()) {
1690  return;
1691  }
1692  const auto& source_ids = source_array_ids[row];
1693  auto& dest_ids = dest_array_ids[row];
1694  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1695  }
1696  };
1697 
1698  const int num_worker_threads = std::thread::hardware_concurrency();
1699 
1700  if (source_array_ids.size() / num_worker_threads > 10) {
1701  std::vector<std::future<void>> worker_threads;
1702  for (int i = 0; i < num_worker_threads; ++i) {
1703  worker_threads.push_back(std::async(std::launch::async, processor, i));
1704  }
1705 
1706  for (auto& child : worker_threads) {
1707  child.wait();
1708  }
1709  for (auto& child : worker_threads) {
1710  child.get();
1711  }
1712  } else {
1713  processor(0);
1714  }
1715 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::vector< std::string const * > &transient_string_vec={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
future< Result > async(Fn &&fn, Args &&...args)
ThreadId thread_id()
Definition: Logger.cpp:817

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::vector< std::string const * > &  transient_string_vec = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_string_vec- ordered vector of string value pointers

Definition at line 1652 of file StringDictionary.cpp.

References CHECK_LT, getOrAddBulk(), getString(), and StringDictionaryProxy::transientIdToIndex().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1657  {
1658  std::vector<std::string> strings;
1659 
1660  for (const int32_t source_id : source_ids) {
1661  if (source_id == std::numeric_limits<int32_t>::min()) {
1662  strings.emplace_back("");
1663  } else if (source_id < 0) {
1664  unsigned const string_index = StringDictionaryProxy::transientIdToIndex(source_id);
1665  CHECK_LT(string_index, transient_string_vec.size()) << "source_id=" << source_id;
1666  strings.emplace_back(*transient_string_vec[string_index]);
1667  } else {
1668  strings.push_back(source_dict->getString(source_id));
1669  }
1670  }
1671 
1672  dest_ids.resize(strings.size());
1673  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1674 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:233
std::string getString(int32_t string_id) const
static unsigned transientIdToIndex(int32_t const id)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 291 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

293  {
294  for (auto& dictionary_future : dictionary_futures) {
295  dictionary_future.wait();
296  const auto hashVec = dictionary_future.get();
297  for (const auto& hash : hashVec) {
298  const uint32_t bucket =
300  payload_file_off_ += hash.second;
301  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
302  if (materialize_hashes_) {
303  hash_cache_[str_count_] = hash.first;
304  }
305  ++str_count_;
306  }
307  }
308  dictionary_futures.clear();
309 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1615 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1615  {
1616  // This method is not thread-safe.
1617 
1618  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1619  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1620 
1621  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1622  auto a_str = this->getStringFromStorage(a);
1623  auto b_str = this->getStringFromStorage(b);
1624  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1625  });
1626 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
constexpr double a
Definition: Utm.h:32
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 791 of file StringDictionary.cpp.

References client_, isClient(), rw_mutex_, and str_count_.

Referenced by buildDictionaryTranslationMap(), eachStringSerially(), getBulk(), and getStringViews().

791  {
792  std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
793  if (isClient()) {
794  return client_->storage_entry_count();
795  }
796  return str_count_;
797 }
bool isClient() const noexcept
std::shared_mutex rw_mutex_
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::update_leaf ( const LeafHostInfo host_info)

Definition at line 379 of file StringDictionary.cpp.

379  {
380 }

Friends And Related Function Documentation

friend class StringLocalCallback
friend

Definition at line 80 of file StringDictionary.h.

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 287 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 288 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
mutableprivate
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
mutableprivate

Definition at line 285 of file StringDictionary.h.

Referenced by buildDictionaryTranslationMap(), and getOrAddBulkArray().

size_t StringDictionary::collisions_
private

Definition at line 263 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 282 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

const DictRef StringDictionary::dict_ref_
private

Definition at line 260 of file StringDictionary.h.

Referenced by getBulk(), getDbId(), getDictId(), getOrAddBulk(), and getOrAddBulkParallel().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 281 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

const std::string StringDictionary::folder_
private

Definition at line 261 of file StringDictionary.h.

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 279 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static
int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 269 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 280 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 266 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 283 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: