OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseCapacity () noexcept
 
template<class String >
void increaseCapacityFromStorageAndMemory (const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) const noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
std::vector< int32_t > string_id_hash_table_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 43 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 109 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), anonymous_namespace{StringDictionary.cpp}::file_size(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, string_id_hash_table_, and logger::WARNING.

114  : str_count_(0)
115  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
116  , rk_hashes_(initial_capacity)
117  , isTemp_(isTemp)
118  , materialize_hashes_(materializeHashes)
119  , payload_fd_(-1)
120  , offset_fd_(-1)
121  , offset_map_(nullptr)
122  , payload_map_(nullptr)
123  , offset_file_size_(0)
124  , payload_file_size_(0)
125  , payload_file_off_(0)
126  , strings_cache_(nullptr) {
127  if (!isTemp && folder.empty()) {
128  return;
129  }
130 
131  // initial capacity must be a power of two for efficient bucket computation
132  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
133  if (!isTemp_) {
134  boost::filesystem::path storage_path(folder);
135  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
136  const auto payload_path =
137  (storage_path / boost::filesystem::path("DictPayload")).string();
138  payload_fd_ = checked_open(payload_path.c_str(), recover);
139  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
141  offset_file_size_ = file_size(offset_fd_);
142  }
143 
144  if (payload_file_size_ == 0) {
146  }
147  if (offset_file_size_ == 0) {
149  }
150  if (!isTemp_) { // we never mmap or recover temp dictionaries
151  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
152  offset_map_ =
153  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
154  if (recover) {
155  const size_t bytes = file_size(offset_fd_);
156  if (bytes % sizeof(StringIdxEntry) != 0) {
157  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
158  }
159  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
160  // at this point we know the size of the StringDict we need to load
161  // so lets reallocate the vector to the correct size
162  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
163  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
164  string_id_hash_table_.swap(new_str_ids);
165  if (materialize_hashes_) {
166  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
167  rk_hashes_.swap(new_rk_hashes);
168  }
169  unsigned string_id = 0;
170  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
171 
172  uint32_t thread_inits = 0;
173  const auto thread_count = std::thread::hardware_concurrency();
174  const uint32_t items_per_thread = std::max<uint32_t>(
175  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
176  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
177  dictionary_futures;
178  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
179  dictionary_futures.emplace_back(std::async(
180  std::launch::async, [string_id, str_count, items_per_thread, this] {
181  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
182  for (uint32_t curr_id = string_id;
183  curr_id < string_id + items_per_thread && curr_id < str_count;
184  curr_id++) {
185  const auto recovered = getStringFromStorage(curr_id);
186  if (recovered.canary) {
187  // hit the canary, recovery finished
188  break;
189  } else {
190  std::string temp(recovered.c_str_ptr, recovered.size);
191  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
192  }
193  }
194  return hashVec;
195  }));
196  thread_inits++;
197  if (thread_inits % thread_count == 0) {
198  processDictionaryFutures(dictionary_futures);
199  }
200  }
201  // gather last few threads
202  if (dictionary_futures.size() != 0) {
203  processDictionaryFutures(dictionary_futures);
204  }
205  }
206  }
207 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
std::vector< int32_t > string_id_hash_table_
void * checked_mmap(const int fd, const size_t sz)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 228 of file StringDictionary.cpp.

229  : strings_cache_(nullptr)
230  , client_(new StringDictionaryClient(host, dict_ref, true))
231  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 233 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

233  {
234  free(CANARY_BUFFER);
235  if (client_) {
236  return;
237  }
238  if (payload_map_) {
239  if (!isTemp_) {
243  CHECK_GE(payload_fd_, 0);
245  CHECK_GE(offset_fd_, 0);
246  close(offset_fd_);
247  } else {
249  free(payload_map_);
250  free(offset_map_);
251  }
252  }
253 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1311 of file StringDictionary.cpp.

References CHECK(), and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1313  {
1314  const size_t canary_buff_size_to_add =
1315  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1316  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1317  if (canary_buffer_size != canary_buff_size_to_add) {
1318  CANARY_BUFFER =
1319  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1320  canary_buffer_size = canary_buff_size_to_add;
1321  }
1323  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1324  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1325  CHECK(new_addr);
1326  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1327  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1328  mem_size += canary_buff_size_to_add;
1329  return new_addr;
1330 }
CHECK(cgen_state)

+ Here is the call graph for this function:

void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1281 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1281  {
1282  if (!isTemp_) {
1283  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1284  } else {
1285  offset_map_ = static_cast<StringIdxEntry*>(
1286  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1287  }
1288 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1272 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1272  {
1273  if (!isTemp_) {
1274  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1275  } else {
1276  payload_map_ = static_cast<char*>(
1277  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1278  }
1279 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1290 of file StringDictionary.cpp.

References CHECK(), CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1292  {
1293  const size_t canary_buff_size_to_add =
1294  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1295  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1296 
1297  if (canary_buffer_size != canary_buff_size_to_add) {
1298  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1299  canary_buffer_size = canary_buff_size_to_add;
1300  }
1302  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1303 
1304  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1305  ssize_t write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1306  CHECK(write_return > 0 &&
1307  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1308  return canary_buff_size_to_add;
1309 }
CHECK(cgen_state)
#define CHECK_NE(x, y)
Definition: Logger.h:206
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( String  str)
privatenoexcept

Definition at line 1217 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1217  {
1218  // write the payload
1220  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1221 
1222  // write the offset and length
1223  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1224  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1225 
1227  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1228 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1231 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1234  {
1235  const size_t num_strings = string_memory_ids.size();
1236 
1237  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1238  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1239 
1240  for (size_t i = 0; i < num_strings; ++i) {
1241  const size_t string_idx = string_memory_ids[i];
1242  const String str = input_strings[string_idx];
1243  const size_t str_size(str.size());
1244  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1245  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1246  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1247  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1248  }
1249 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1362 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1362  {
1363  // This method is not thread-safe.
1364  const auto cur_cache_size = sorted_cache.size();
1365  std::vector<int32_t> temp_sorted_cache;
1366  for (size_t i = cur_cache_size; i < str_count_; i++) {
1367  temp_sorted_cache.push_back(i);
1368  }
1369  sortCache(temp_sorted_cache);
1370  mergeSortedCache(temp_sorted_cache);
1371 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1196 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1197  {
1198  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1199  if (offset_file_off + write_length >= offset_file_size_) {
1200  const size_t min_capacity_needed =
1201  write_length - (offset_file_size_ - offset_file_off);
1202  if (!isTemp_) {
1203  CHECK_GE(offset_fd_, 0);
1205  addOffsetCapacity(min_capacity_needed);
1206  CHECK(offset_file_off + write_length <= offset_file_size_);
1207  offset_map_ =
1208  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
1209  } else {
1210  addOffsetCapacity(min_capacity_needed);
1211  CHECK(offset_file_off + write_length <= offset_file_size_);
1212  }
1213  }
1214 }
StringIdxEntry * offset_map_
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
CHECK(cgen_state)
void * checked_mmap(const int fd, const size_t sz)

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1177 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1178  {
1179  if (payload_file_off_ + write_length > payload_file_size_) {
1180  const size_t min_capacity_needed =
1181  write_length - (payload_file_size_ - payload_file_off_);
1182  if (!isTemp_) {
1183  CHECK_GE(payload_fd_, 0);
1185  addPayloadCapacity(min_capacity_needed);
1186  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1187  payload_map_ =
1188  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
1189  } else {
1190  addPayloadCapacity(min_capacity_needed);
1191  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1192  }
1193  }
1194 }
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:210
CHECK(cgen_state)
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void * checked_mmap(const int fd, const size_t sz)

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1345 of file StringDictionary.cpp.

References CHECK(), client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by Importer_NS::TypedImportBuffer::stringDictCheckpoint().

1345  {
1346  if (client_) {
1347  try {
1348  return client_->checkpoint();
1349  } catch (...) {
1350  return false;
1351  }
1352  }
1353  CHECK(!isTemp_);
1354  bool ret = true;
1355  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1356  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1357  ret = ret && (fsync(offset_fd_) == 0);
1358  ret = ret && (fsync(payload_fd_) == 0);
1359  return ret;
1360 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const String &  str,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1082 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1085  {
1086  auto bucket = hash & (data.size() - 1);
1087  while (true) {
1088  const int32_t candidate_string_id = data[bucket];
1089  if (candidate_string_id ==
1090  INVALID_STR_ID) { // In this case it means the slot is available for use
1091  break;
1092  }
1093  if (materialize_hashes_ && hash != rk_hashes_[candidate_string_id]) {
1094  continue;
1095  }
1096  const auto old_str = getStringFromStorageFast(candidate_string_id);
1097  if (str.size() == old_str.size() && !memcmp(str.data(), old_str.data(), str.size())) {
1098  // found the string
1099  break;
1100  }
1101  // wrap around
1102  if (++bucket == data.size()) {
1103  bucket = 0;
1104  }
1105  }
1106  return bucket;
1107 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const uint32_t  input_string_rk_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1110 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1116  {
1117  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1118  while (true) {
1119  const int32_t candidate_string_id = string_id_hash_table[bucket];
1120  if (candidate_string_id ==
1121  INVALID_STR_ID) { // In this case it means the slot is available for use
1122  break;
1123  }
1124  if (!materialize_hashes_ ||
1125  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1126  if (candidate_string_id > 0 &&
1127  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1128  // The candidate string is not in storage yet but in our string_memory_ids temp
1129  // buffer
1130  size_t memory_offset =
1131  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1132  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1133  if (input_string.size() == candidate_string.size() &&
1134  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1135  // found the string in the temp memory buffer
1136  break;
1137  }
1138  } else {
1139  // The candidate string is in storage, need to fetch it for comparison
1140  const auto candidate_storage_string =
1141  getStringFromStorageFast(candidate_string_id);
1142  if (input_string.size() == candidate_storage_string.size() &&
1143  !memcmp(input_string.data(),
1144  candidate_storage_string.data(),
1145  input_string.size())) {
1148  // found the string in storage
1149  break;
1150  }
1151  }
1152  }
1153  if (++bucket == string_id_hash_table.size()) {
1154  bucket = 0;
1155  }
1156  }
1157  return bucket;
1158 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1160 of file StringDictionary.cpp.

Referenced by increaseCapacity(), and processDictionaryFutures().

1162  {
1163  auto bucket = hash & (data.size() - 1);
1164  while (true) {
1165  if (data[bucket] ==
1166  INVALID_STR_ID) { // In this case it means the slot is available for use
1167  break;
1168  }
1169  // wrap around
1170  if (++bucket == data.size()) {
1171  bucket = 0;
1172  }
1173  }
1174  return bucket;
1175 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 915 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, Asio::start(), str_count_, and strings_cache_.

915  {
916  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
917  if (client_) {
918  // TODO(miyu): support remote string dictionary
919  throw std::runtime_error(
920  "copying dictionaries from remote server is not supported yet.");
921  }
922 
923  if (strings_cache_) {
924  return strings_cache_;
925  }
926 
927  strings_cache_ = std::make_shared<std::vector<std::string>>();
928  strings_cache_->reserve(str_count_);
929  const bool multithreaded = str_count_ > 10000;
930  const auto worker_count =
931  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
932  CHECK_GT(worker_count, 0UL);
933  std::vector<std::vector<std::string>> worker_results(worker_count);
934  auto copy = [this](std::vector<std::string>& str_list,
935  const size_t start_id,
936  const size_t end_id) {
937  CHECK_LE(start_id, end_id);
938  str_list.reserve(end_id - start_id);
939  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
940  str_list.push_back(getStringUnlocked(string_id));
941  }
942  };
943  if (multithreaded) {
944  std::vector<std::future<void>> workers;
945  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
946  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
947  worker_idx < worker_count && start < str_count_;
948  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
949  workers.push_back(std::async(
950  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
951  }
952  for (auto& worker : workers) {
953  worker.get();
954  }
955  } else {
956  CHECK_EQ(worker_results.size(), size_t(1));
957  copy(worker_results[0], 0, str_count_);
958  }
959 
960  for (const auto& worker_result : worker_results) {
961  strings_cache_->insert(
962  strings_cache_->end(), worker_result.begin(), worker_result.end());
963  }
964  return strings_cache_;
965 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GT(x, y)
Definition: Logger.h:209
void start()
Definition: Asio.cpp:33
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 967 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

967  {
968  return string_id_hash_table_.size() <= num_strings * 2;
969 }
std::vector< int32_t > string_id_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 707 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

709  {
710  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
711  if (client_) {
712  return client_->get_compare(pattern, comp_operator, generation);
713  }
714  std::vector<int32_t> ret;
715  if (str_count_ == 0) {
716  return ret;
717  }
718  if (sorted_cache.size() < str_count_) {
719  if (comp_operator == "=" || comp_operator == "<>") {
720  return getEquals(pattern, comp_operator, generation);
721  }
722 
724  }
725  auto cache_index = compare_cache_.get(pattern);
726 
727  if (!cache_index) {
728  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
729  const auto cache_itr = std::lower_bound(
730  sorted_cache.begin(),
731  sorted_cache.end(),
732  pattern,
733  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
734  auto a_str = this->getStringFromStorage(a);
735  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
736  });
737 
738  if (cache_itr == sorted_cache.end()) {
739  cache_index->index = sorted_cache.size() - 1;
740  cache_index->diff = 1;
741  } else {
742  const auto cache_str = getStringFromStorage(*cache_itr);
743  if (!string_eq(
744  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
745  cache_index->index = cache_itr - sorted_cache.begin() - 1;
746  cache_index->diff = 1;
747  } else {
748  cache_index->index = cache_itr - sorted_cache.begin();
749  cache_index->diff = 0;
750  }
751  }
752 
753  compare_cache_.put(pattern, cache_index);
754  }
755 
756  // since we have a cache in form of vector of ints which is sorted according to
757  // corresponding strings in the dictionary all we need is the index of the element
758  // which equal to the pattern that we are trying to match or the index of “biggest”
759  // element smaller than the pattern, to perform all the comparison operators over
760  // string. The search function guarantees we have such index so now it is just the
761  // matter to include all the elements in the result vector.
762 
763  // For < operator if the index that we have points to the element which is equal to
764  // the pattern that we are searching for we simply get all the elements less than the
765  // index. If the element pointed by the index is not equal to the pattern we are
766  // comparing with we also need to include that index in result vector, except when the
767  // index points to 0 and the pattern is lesser than the smallest value in the string
768  // dictionary.
769 
770  if (comp_operator == "<") {
771  size_t idx = cache_index->index;
772  if (cache_index->diff) {
773  idx = cache_index->index + 1;
774  if (cache_index->index == 0 && cache_index->diff > 0) {
775  idx = cache_index->index;
776  }
777  }
778  for (size_t i = 0; i < idx; i++) {
779  ret.push_back(sorted_cache[i]);
780  }
781 
782  // For <= operator if the index that we have points to the element which is equal to
783  // the pattern that we are searching for we want to include the element pointed by
784  // the index in the result set. If the element pointed by the index is not equal to
785  // the pattern we are comparing with we just want to include all the ids with index
786  // less than the index that is cached, except when pattern that we are searching for
787  // is smaller than the smallest string in the dictionary.
788 
789  } else if (comp_operator == "<=") {
790  size_t idx = cache_index->index + 1;
791  if (cache_index == 0 && cache_index->diff > 0) {
792  idx = cache_index->index;
793  }
794  for (size_t i = 0; i < idx; i++) {
795  ret.push_back(sorted_cache[i]);
796  }
797 
798  // For > operator we want to get all the elements with index greater than the index
799  // that we have except, when the pattern we are searching for is lesser than the
800  // smallest string in the dictionary we also want to include the id of the index
801  // that we have.
802 
803  } else if (comp_operator == ">") {
804  size_t idx = cache_index->index + 1;
805  if (cache_index->index == 0 && cache_index->diff > 0) {
806  idx = cache_index->index;
807  }
808  for (size_t i = idx; i < sorted_cache.size(); i++) {
809  ret.push_back(sorted_cache[i]);
810  }
811 
812  // For >= operator when the indexed element that we have points to element which is
813  // equal to the pattern we are searching for we want to include that in the result
814  // vector. If the index that we have does not point to the string which is equal to
815  // the pattern we are searching we don’t want to include that id into the result
816  // vector except when the index is 0.
817 
818  } else if (comp_operator == ">=") {
819  size_t idx = cache_index->index;
820  if (cache_index->diff) {
821  idx = cache_index->index + 1;
822  if (cache_index->index == 0 && cache_index->diff > 0) {
823  idx = cache_index->index;
824  }
825  }
826  for (size_t i = idx; i < sorted_cache.size(); i++) {
827  ret.push_back(sorted_cache[i]);
828  }
829  } else if (comp_operator == "=") {
830  if (!cache_index->diff) {
831  ret.push_back(sorted_cache[cache_index->index]);
832  }
833 
834  // For <> operator it is simple matter of not including id of string which is equal
835  // to pattern we are searching for.
836  } else if (comp_operator == "<>") {
837  if (!cache_index->diff) {
838  size_t idx = cache_index->index;
839  for (size_t i = 0; i < idx; i++) {
840  ret.push_back(sorted_cache[i]);
841  }
842  ++idx;
843  for (size_t i = idx; i < sorted_cache.size(); i++) {
844  ret.push_back(sorted_cache[i]);
845  }
846  } else {
847  for (size_t i = 0; i < sorted_cache.size(); i++) {
848  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
849  }
850  }
851 
852  } else {
853  std::runtime_error("Unsupported string comparison operator");
854  }
855  return ret;
856 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 647 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

649  {
650  std::vector<int32_t> result;
651  auto eq_id_itr = equal_cache_.find(pattern);
652  int32_t eq_id = MAX_STRLEN + 1;
653  int32_t cur_size = str_count_;
654  if (eq_id_itr != equal_cache_.end()) {
655  auto eq_id = eq_id_itr->second;
656  if (comp_operator == "=") {
657  result.push_back(eq_id);
658  } else {
659  for (int32_t idx = 0; idx <= cur_size; idx++) {
660  if (idx == eq_id) {
661  continue;
662  }
663  result.push_back(idx);
664  }
665  }
666  } else {
667  std::vector<std::thread> workers;
668  int worker_count = cpu_threads();
669  CHECK_GT(worker_count, 0);
670  std::vector<std::vector<int32_t>> worker_results(worker_count);
671  CHECK_LE(generation, str_count_);
672  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
673  workers.emplace_back(
674  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
675  for (size_t string_id = worker_idx; string_id < generation;
676  string_id += worker_count) {
677  const auto str = getStringUnlocked(string_id);
678  if (str == pattern) {
679  worker_results[worker_idx].push_back(string_id);
680  }
681  }
682  });
683  }
684  for (auto& worker : workers) {
685  worker.join();
686  }
687  for (const auto& worker_result : worker_results) {
688  result.insert(result.end(), worker_result.begin(), worker_result.end());
689  }
690  if (result.size() > 0) {
691  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
692  CHECK(it_ok.second);
693  eq_id = result[0];
694  }
695  if (comp_operator == "<>") {
696  for (int32_t idx = 0; idx <= cur_size; idx++) {
697  if (idx == eq_id) {
698  continue;
699  }
700  result.push_back(idx);
701  }
702  }
703  }
704  return result;
705 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:208
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 522 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

522  {
523  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
524  if (client_) {
525  return client_->get(str);
526  }
527  return getUnlocked(str);
528 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 594 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

598  {
599  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
600  if (client_) {
601  return client_->get_like(pattern, icase, is_simple, escape, generation);
602  }
603  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
604  const auto it = like_cache_.find(cache_key);
605  if (it != like_cache_.end()) {
606  return it->second;
607  }
608  std::vector<int32_t> result;
609  std::vector<std::thread> workers;
610  int worker_count = cpu_threads();
611  CHECK_GT(worker_count, 0);
612  std::vector<std::vector<int32_t>> worker_results(worker_count);
613  CHECK_LE(generation, str_count_);
614  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
615  workers.emplace_back([&worker_results,
616  &pattern,
617  generation,
618  icase,
619  is_simple,
620  escape,
621  worker_idx,
622  worker_count,
623  this]() {
624  for (size_t string_id = worker_idx; string_id < generation;
625  string_id += worker_count) {
626  const auto str = getStringUnlocked(string_id);
627  if (is_like(str, pattern, icase, is_simple, escape)) {
628  worker_results[worker_idx].push_back(string_id);
629  }
630  }
631  });
632  }
633  for (auto& worker : workers) {
634  worker.join();
635  }
636  for (const auto& worker_result : worker_results) {
637  result.insert(result.end(), worker_result.begin(), worker_result.end());
638  }
639  // place result into cache for reuse if similar query
640  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
641 
642  CHECK(it_ok.second);
643 
644  return result;
645 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:208
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 255 of file StringDictionary.cpp.

References CHECK_EQ.

255  {
256  if (client_) {
257  std::vector<int32_t> string_ids;
258  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
259  CHECK_EQ(size_t(1), string_ids.size());
260  return string_ids.front();
261  }
262  return getOrAddImpl(str);
263 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 327 of file StringDictionary.cpp.

References appendToStorage(), CHECK(), CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by Importer_NS::TypedImportBuffer::addDictEncodedString(), getOrAddBulkArray(), and populate_string_ids().

328  {
330  getOrAddBulkParallel(input_strings, output_string_ids);
331  return;
332  }
333  // Single-thread path.
334  if (client_no_timeout_) {
335  getOrAddBulkRemote(input_strings, output_string_ids);
336  return;
337  }
338  size_t out_idx{0};
339  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
340 
341  for (const auto& str : input_strings) {
342  if (str.empty()) {
343  output_string_ids[out_idx++] = inline_int_null_value<T>();
344  continue;
345  }
346  CHECK(str.size() <= MAX_STRLEN);
347  uint32_t bucket;
348  const uint32_t hash = rk_hash(str);
349  bucket = computeBucket(hash, str, string_id_hash_table_);
350  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
351  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
352  continue;
353  }
354  // need to add record to dictionary
355  // check there is room
356  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
357  log_encoding_error<T>(str);
358  output_string_ids[out_idx++] = inline_int_null_value<T>();
359  continue;
360  }
361  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
363  << "Maximum number (" << str_count_
364  << ") of Dictionary encoded Strings reached for this column, offset path "
365  "for column is "
366  << offsets_path_;
367  if (fillRateIsHigh(str_count_)) {
368  // resize when more than 50% is full
370  bucket = computeBucket(hash, str, string_id_hash_table_);
371  }
372  appendToStorage(str);
373 
374  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
375  if (materialize_hashes_) {
376  rk_hashes_[str_count_] = hash;
377  }
378  ++str_count_;
379  }
380  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
381  }
383 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 277 of file StringDictionary.cpp.

References getOrAddBulk().

Referenced by Importer_NS::TypedImportBuffer::addDictEncodedStringArray().

279  {
280  ids_array_vec.resize(string_array_vec.size());
281  for (size_t i = 0; i < string_array_vec.size(); i++) {
282  auto& strings = string_array_vec[i];
283  auto& ids = ids_array_vec[i];
284  ids.resize(strings.size());
285  getOrAddBulk(strings, &ids[0]);
286  }
287 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 386 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK(), CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), getOrAddBulkRemote(), hashStrings(), increaseCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

387  {
388  if (client_no_timeout_) {
389  getOrAddBulkRemote(input_strings, output_string_ids);
390  return;
391  }
392  // Run rk_hash on the input strings up front, and in parallel,
393  // as the string hashing does not need to be behind the subsequent write_lock
394  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
395  hashStrings(input_strings, input_strings_rk_hashes);
396 
397  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
398  size_t shadow_str_count =
399  str_count_; // Need to shadow str_count_ now with bulk add methods
400  const size_t storage_high_water_mark = shadow_str_count;
401  std::vector<size_t> string_memory_ids;
402  size_t sum_new_string_lengths = 0;
403  string_memory_ids.reserve(input_strings.size());
404  size_t input_string_idx{0};
405  for (const auto& input_string : input_strings) {
406  // Currently we make empty strings null
407  if (input_string.empty()) {
408  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
409  continue;
410  }
411  // TODO: Recover gracefully if an input string is too long
412  CHECK(input_string.size() <= MAX_STRLEN);
413 
414  if (fillRateIsHigh(shadow_str_count)) {
415  // resize when more than 50% is full
416  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
417  input_strings,
418  string_memory_ids,
419  input_strings_rk_hashes);
420  }
421  // Get the rk_hash for this input_string
422  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
423 
424  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
425  input_string,
427  storage_high_water_mark,
428  input_strings,
429  string_memory_ids);
430 
431  // If the hash bucket is not empty, that is our string id
432  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
433  // bucket string are equal)
434  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
435  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
436  continue;
437  }
438  // Did not find string, so need to add record to dictionary
439  // First check there is room
440  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
441  log_encoding_error<T>(input_string);
442  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
443  continue;
444  }
445  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
446  << "Maximum number (" << shadow_str_count
447  << ") of Dictionary encoded Strings reached for this column, offset path "
448  "for column is "
449  << offsets_path_;
450 
451  string_memory_ids.push_back(input_string_idx);
452  sum_new_string_lengths += input_string.size();
453  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
454  if (materialize_hashes_) {
455  rk_hashes_[shadow_str_count] = input_string_rk_hash;
456  }
457  output_string_ids[input_string_idx++] = shadow_str_count++;
458  }
459  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
460  str_count_ = shadow_str_count;
461 
463 }
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 482 of file StringDictionary.cpp.

References CHECK(), and client_no_timeout_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

483  {
485  std::vector<int32_t> string_ids;
486  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
487  size_t out_idx{0};
488  for (size_t i = 0; i < string_ids.size(); ++i) {
489  const auto string_id = string_ids[i];
490  const bool invalid = string_id > max_valid_int_value<T>();
491  if (invalid || string_id == inline_int_null_value<int32_t>()) {
492  if (invalid) {
493  log_encoding_error<T>(string_vec[i]);
494  }
495  encoded_vec[out_idx++] = inline_int_null_value<T>();
496  continue;
497  }
498  encoded_vec[out_idx++] = string_id;
499  }
500 }
CHECK(cgen_state)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 1027 of file StringDictionary.cpp.

References CHECK(), CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

1027  {
1028  // @TODO(wei) treat empty string as NULL for now
1029  if (str.size() == 0) {
1030  return inline_int_null_value<int32_t>();
1031  }
1032  CHECK(str.size() <= MAX_STRLEN);
1033  uint32_t bucket;
1034  const uint32_t hash = rk_hash(str);
1035  {
1036  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1037  bucket = computeBucket(hash, str, string_id_hash_table_);
1038  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1039  return string_id_hash_table_[bucket];
1040  }
1041  }
1042  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1043  // need to recalculate the bucket in case it changed before
1044  // we got the lock
1045  bucket = computeBucket(hash, str, string_id_hash_table_);
1046  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1048  << "Maximum number (" << str_count_
1049  << ") of Dictionary encoded Strings reached for this column, offset path "
1050  "for column is "
1051  << offsets_path_;
1052  if (fillRateIsHigh(str_count_)) {
1053  // resize when more than 50% is full
1054  increaseCapacity();
1055  bucket = computeBucket(hash, str, string_id_hash_table_);
1056  }
1057  appendToStorage(str);
1058  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1059  if (materialize_hashes_) {
1060  rk_hashes_[str_count_] = hash;
1061  }
1062  ++str_count_;
1064  }
1065  return string_id_hash_table_[bucket];
1066 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
void invalidateInvertedIndex() noexcept
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 868 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

870  {
871  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
872  if (client_) {
873  return client_->get_regexp_like(pattern, escape, generation);
874  }
875  const auto cache_key = std::make_pair(pattern, escape);
876  const auto it = regex_cache_.find(cache_key);
877  if (it != regex_cache_.end()) {
878  return it->second;
879  }
880  std::vector<int32_t> result;
881  std::vector<std::thread> workers;
882  int worker_count = cpu_threads();
883  CHECK_GT(worker_count, 0);
884  std::vector<std::vector<int32_t>> worker_results(worker_count);
885  CHECK_LE(generation, str_count_);
886  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
887  workers.emplace_back([&worker_results,
888  &pattern,
889  generation,
890  escape,
891  worker_idx,
892  worker_count,
893  this]() {
894  for (size_t string_id = worker_idx; string_id < generation;
895  string_id += worker_count) {
896  const auto str = getStringUnlocked(string_id);
897  if (is_regexp_like(str, pattern, escape)) {
898  worker_results[worker_idx].push_back(string_id);
899  }
900  }
901  });
902  }
903  for (auto& worker : workers) {
904  worker.join();
905  }
906  for (const auto& worker_result : worker_results) {
907  result.insert(result.end(), worker_result.begin(), worker_result.end());
908  }
909  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
910  CHECK(it_ok.second);
911 
912  return result;
913 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 536 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

536  {
537  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
538  if (client_) {
539  std::string ret;
540  client_->get_string(ret, string_id);
541  return ret;
542  }
543  return getStringUnlocked(string_id);
544 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 551 of file StringDictionary.cpp.

References CHECK(), CHECK_LE, and CHECK_LT.

552  {
553  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
554  CHECK(!client_);
555  CHECK_LE(0, string_id);
556  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
557  return getStringBytesChecked(string_id);
558 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK_LE(x, y)
Definition: Logger.h:208

+ Here is the call graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1074 of file StringDictionary.cpp.

References CHECK().

1075  {
1076  const auto str_canary = getStringFromStorage(string_id);
1077  CHECK(!str_canary.canary);
1078  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1079 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1068 of file StringDictionary.cpp.

References CHECK().

Referenced by increaseCapacity().

1068  {
1069  const auto str_canary = getStringFromStorage(string_id);
1070  CHECK(!str_canary.canary);
1071  return std::string(str_canary.c_str_ptr, str_canary.size);
1072 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1257 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1258  {
1259  if (!isTemp_) {
1260  CHECK_GE(payload_fd_, 0);
1261  CHECK_GE(offset_fd_, 0);
1262  }
1263  CHECK_GE(string_id, 0);
1264  const StringIdxEntry* str_meta = offset_map_ + string_id;
1265  if (str_meta->size == 0xffff) {
1266  // hit the canary
1267  return {nullptr, 0, true};
1268  }
1269  return {payload_map_ + str_meta->off, str_meta->size, false};
1270 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1251 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1252  {
1253  const StringIdxEntry* str_meta = offset_map_ + string_id;
1254  return {payload_map_ + str_meta->off, str_meta->size};
1255 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 546 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

546  {
547  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
548  return getStringChecked(string_id);
549 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 530 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getIdOfString().

530  {
531  const uint32_t hash = rk_hash(str);
532  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
533  return str_id;
534 }
uint32_t rk_hash(const std::string_view &str)
std::vector< int32_t > string_id_hash_table_
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< uint32_t > &  hashes 
) const
privatenoexcept

Method to rk_hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 299 of file StringDictionary.cpp.

References CHECK_EQ, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

300  {
301  CHECK_EQ(string_vec.size(), hashes.size());
302  const size_t min_target_strings_per_thread{2000};
303  const size_t str_count = string_vec.size();
304  const size_t max_thread_count = std::thread::hardware_concurrency();
305  const size_t items_per_thread =
306  std::max<size_t>(min_target_strings_per_thread, str_count / max_thread_count + 1);
307 
308  std::vector<std::thread> workers;
309  for (size_t string_id = 0; string_id < str_count; string_id += items_per_thread) {
310  workers.emplace_back(
311  [&string_vec, &hashes, string_id, str_count, items_per_thread]() {
312  const size_t end_id = std::min(string_id + items_per_thread, str_count);
313  for (size_t curr_id = string_id; curr_id < end_id; ++curr_id) {
314  if (string_vec[curr_id].empty()) {
315  continue;
316  }
317  hashes[curr_id] = rk_hash(string_vec[curr_id]);
318  }
319  });
320  }
321  for (auto& worker : workers) {
322  worker.join();
323  }
324 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 971 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

971  {
972  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
973 
974  if (materialize_hashes_) {
975  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
977  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
978  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
979  new_str_ids[bucket] = string_id_hash_table_[i];
980  }
981  }
982  rk_hashes_.resize(rk_hashes_.size() * 2);
983  } else {
984  for (size_t i = 0; i < str_count_; ++i) {
985  const auto str = getStringChecked(i);
986  const uint32_t hash = rk_hash(str);
987  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
988  new_str_ids[bucket] = i;
989  }
990  }
991  string_id_hash_table_.swap(new_str_ids);
992 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseCapacityFromStorageAndMemory ( const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< uint32_t > &  input_strings_rk_hashes 
)
privatenoexcept

Definition at line 995 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

999  {
1000  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1001  if (materialize_hashes_) {
1002  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1004  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1005  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1006  new_str_ids[bucket] = string_id_hash_table_[i];
1007  }
1008  }
1009  rk_hashes_.resize(rk_hashes_.size() * 2);
1010  } else {
1011  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1012  const auto storage_string = getStringChecked(storage_idx);
1013  const uint32_t hash = rk_hash(storage_string);
1014  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1015  new_str_ids[bucket] = storage_idx;
1016  }
1017  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1018  size_t string_memory_id = string_memory_ids[memory_idx];
1019  uint32_t bucket = computeUniqueBucketWithHash(
1020  input_strings_rk_hashes[string_memory_id], new_str_ids);
1021  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1022  }
1023  }
1024  string_id_hash_table_.swap(new_str_ids);
1025 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1332 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1332  {
1333  if (!like_cache_.empty()) {
1334  decltype(like_cache_)().swap(like_cache_);
1335  }
1336  if (!regex_cache_.empty()) {
1337  decltype(regex_cache_)().swap(regex_cache_);
1338  }
1339  if (!equal_cache_.empty()) {
1340  decltype(equal_cache_)().swap(equal_cache_);
1341  }
1342  compare_cache_.invalidateInvertedIndex();
1343 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1386 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1386  {
1387  // this method is not thread safe
1388  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1389  size_t t_idx = 0, s_idx = 0, idx = 0;
1390  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1391  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1392  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1393  const auto insert_from_temp_cache =
1394  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1395  if (insert_from_temp_cache) {
1396  updated_cache[idx] = temp_sorted_cache[t_idx++];
1397  } else {
1398  updated_cache[idx] = sorted_cache[s_idx++];
1399  }
1400  }
1401  while (t_idx < temp_sorted_cache.size()) {
1402  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1403  }
1404  while (s_idx < sorted_cache.size()) {
1405  updated_cache[idx++] = sorted_cache[s_idx++];
1406  }
1407  sorted_cache.swap(updated_cache);
1408 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1437 of file StringDictionary.cpp.

References populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1441  {
1442  dest_array_ids.resize(source_array_ids.size());
1443 
1444  std::atomic<size_t> row_idx{0};
1445  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1446  int thread_id) {
1447  for (;;) {
1448  auto row = row_idx.fetch_add(1);
1449 
1450  if (row >= dest_array_ids.size()) {
1451  return;
1452  }
1453  const auto& source_ids = source_array_ids[row];
1454  auto& dest_ids = dest_array_ids[row];
1455  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1456  }
1457  };
1458 
1459  const int num_worker_threads = std::thread::hardware_concurrency();
1460 
1461  if (source_array_ids.size() / num_worker_threads > 10) {
1462  std::vector<std::future<void>> worker_threads;
1463  for (int i = 0; i < num_worker_threads; ++i) {
1464  worker_threads.push_back(std::async(std::launch::async, processor, i));
1465  }
1466 
1467  for (auto& child : worker_threads) {
1468  child.wait();
1469  }
1470  for (auto& child : worker_threads) {
1471  child.get();
1472  }
1473  } else {
1474  processor(0);
1475  }
1476 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:715

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1410 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1415  {
1416  std::vector<std::string> strings;
1417 
1418  for (const int32_t source_id : source_ids) {
1419  if (source_id == std::numeric_limits<int32_t>::min()) {
1420  strings.emplace_back("");
1421  } else if (source_id < 0) {
1422  if (auto string_itr = transient_mapping.find(source_id);
1423  string_itr != transient_mapping.end()) {
1424  strings.emplace_back(string_itr->second);
1425  } else {
1426  throw std::runtime_error("Unexpected negative source ID");
1427  }
1428  } else {
1429  strings.push_back(source_dict->getString(source_id));
1430  }
1431  }
1432 
1433  dest_ids.resize(strings.size());
1434  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1435 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 209 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by StringDictionary().

211  {
212  for (auto& dictionary_future : dictionary_futures) {
213  dictionary_future.wait();
214  auto hashVec = dictionary_future.get();
215  for (auto& hash : hashVec) {
216  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
217  payload_file_off_ += hash.second;
218  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
219  if (materialize_hashes_) {
220  rk_hashes_[str_count_] = hash.first;
221  }
222  ++str_count_;
223  }
224  }
225  dictionary_futures.clear();
226 }
std::vector< int32_t > string_id_hash_table_
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1373 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1373  {
1374  // This method is not thread-safe.
1375 
1376  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1377  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1378 
1379  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1380  auto a_str = this->getStringFromStorage(a);
1381  auto b_str = this->getStringFromStorage(b);
1382  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1383  });
1384 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 560 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

560  {
561  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
562  if (client_) {
563  return client_->storage_entry_count();
564  }
565  return str_count_;
566 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 225 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 226 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 223 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 220 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 219 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 217 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 116 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 207 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 218 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<uint32_t> StringDictionary::rk_hashes_
private
std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 204 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 221 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: