OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 44 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 95 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, omnisci::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, omnisci::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

100  : str_count_(0)
102  , hash_cache_(initial_capacity)
103  , isTemp_(isTemp)
104  , materialize_hashes_(materializeHashes)
105  , payload_fd_(-1)
106  , offset_fd_(-1)
107  , offset_map_(nullptr)
108  , payload_map_(nullptr)
109  , offset_file_size_(0)
110  , payload_file_size_(0)
111  , payload_file_off_(0)
112  , strings_cache_(nullptr) {
113  if (!isTemp && folder.empty()) {
114  return;
115  }
116 
117  // initial capacity must be a power of two for efficient bucket computation
118  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
119  if (!isTemp_) {
120  boost::filesystem::path storage_path(folder);
121  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
122  const auto payload_path =
123  (storage_path / boost::filesystem::path("DictPayload")).string();
124  payload_fd_ = checked_open(payload_path.c_str(), recover);
125  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
128  }
129  bool storage_is_empty = false;
130  if (payload_file_size_ == 0) {
131  storage_is_empty = true;
133  }
134  if (offset_file_size_ == 0) {
136  }
137  if (!isTemp_) { // we never mmap or recover temp dictionaries
138  payload_map_ =
139  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
140  offset_map_ = reinterpret_cast<StringIdxEntry*>(
142  if (recover) {
143  const size_t bytes = omnisci::file_size(offset_fd_);
144  if (bytes % sizeof(StringIdxEntry) != 0) {
145  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
146  }
147  const uint64_t str_count =
148  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
149  collisions_ = 0;
150  // at this point we know the size of the StringDict we need to load
151  // so lets reallocate the vector to the correct size
152  const uint64_t max_entries =
153  std::max(round_up_p2(str_count * 2 + 1),
154  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
155  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
156  string_id_string_dict_hash_table_.swap(new_str_ids);
157  if (materialize_hashes_) {
158  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
159  hash_cache_.swap(new_hash_cache);
160  }
161  // Bail early if we know we don't have strings to add (i.e. a new or empty
162  // dictionary)
163  if (str_count == 0) {
164  return;
165  }
166 
167  unsigned string_id = 0;
168  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
169 
170  uint32_t thread_inits = 0;
171  const auto thread_count = std::thread::hardware_concurrency();
172  const uint32_t items_per_thread = std::max<uint32_t>(
173  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
174  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
175  dictionary_futures;
176  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
177  dictionary_futures.emplace_back(std::async(
178  std::launch::async, [string_id, str_count, items_per_thread, this] {
179  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
180  for (uint32_t curr_id = string_id;
181  curr_id < string_id + items_per_thread && curr_id < str_count;
182  curr_id++) {
183  const auto recovered = getStringFromStorage(curr_id);
184  if (recovered.canary) {
185  // hit the canary, recovery finished
186  break;
187  } else {
188  std::string_view temp(recovered.c_str_ptr, recovered.size);
189  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
190  }
191  }
192  return hashVec;
193  }));
194  thread_inits++;
195  if (thread_inits % thread_count == 0) {
196  processDictionaryFutures(dictionary_futures);
197  }
198  }
199  // gather last few threads
200  if (dictionary_futures.size() != 0) {
201  processDictionaryFutures(dictionary_futures);
202  }
203  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
204  << " Hash table size: " << string_id_string_dict_hash_table_.size()
205  << " Fill rate: "
206  << static_cast<double>(str_count_) * 100.0 /
208  << "% Collisions: " << collisions_;
209  }
210  }
211 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
#define VLOG(n)
Definition: Logger.h:291
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 263 of file StringDictionary.cpp.

264  : strings_cache_(nullptr)
265  , client_(new StringDictionaryClient(host, dict_ref, true))
266  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 268 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, omnisci::checked_munmap(), client_, omnisci::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

268  {
269  free(CANARY_BUFFER);
270  if (client_) {
271  return;
272  }
273  if (payload_map_) {
274  if (!isTemp_) {
278  CHECK_GE(payload_fd_, 0);
280  CHECK_GE(offset_fd_, 0);
282  } else {
284  free(payload_map_);
285  free(offset_map_);
286  }
287  }
288 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::unique_ptr< StringDictionaryClient > client_
void close(const int fd)
Definition: omnisci_fs.cpp:68
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1348 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1350  {
1351  const size_t canary_buff_size_to_add =
1352  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1353  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1354  if (canary_buffer_size < canary_buff_size_to_add) {
1355  CANARY_BUFFER =
1356  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1357  canary_buffer_size = canary_buff_size_to_add;
1359  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1360  }
1361  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1362  CHECK(new_addr);
1363  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1364  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1365  mem_size += canary_buff_size_to_add;
1366  return new_addr;
1367 }
#define CHECK(condition)
Definition: Logger.h:197
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1318 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1318  {
1319  if (!isTemp_) {
1320  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1321  } else {
1322  offset_map_ = static_cast<StringIdxEntry*>(
1323  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1324  }
1325 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1309 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1309  {
1310  if (!isTemp_) {
1311  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1312  } else {
1313  payload_map_ = static_cast<char*>(
1314  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1315  }
1316 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1327 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1329  {
1330  const size_t canary_buff_size_to_add =
1331  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1332  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1333 
1334  if (canary_buffer_size < canary_buff_size_to_add) {
1335  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1336  canary_buffer_size = canary_buff_size_to_add;
1338  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1339  }
1340 
1341  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1342  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1343  CHECK(write_return > 0 &&
1344  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1345  return canary_buff_size_to_add;
1346 }
#define CHECK_NE(x, y)
Definition: Logger.h:206
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:126
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1254 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1254  {
1255  // write the payload
1257  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1258 
1259  // write the offset and length
1260  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1261  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1262 
1264  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1265 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1268 of file StringDictionary.cpp.

References generate_TableFunctionsFactory_init::i.

Referenced by getOrAddBulkParallel().

1271  {
1272  const size_t num_strings = string_memory_ids.size();
1273 
1274  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1275  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1276 
1277  for (size_t i = 0; i < num_strings; ++i) {
1278  const size_t string_idx = string_memory_ids[i];
1279  const String str = input_strings[string_idx];
1280  const size_t str_size(str.size());
1281  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1282  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1283  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1284  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1285  }
1286 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1401 of file StringDictionary.cpp.

References generate_TableFunctionsFactory_init::i, mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1401  {
1402  // This method is not thread-safe.
1403  const auto cur_cache_size = sorted_cache.size();
1404  std::vector<int32_t> temp_sorted_cache;
1405  for (size_t i = cur_cache_size; i < str_count_; i++) {
1406  temp_sorted_cache.push_back(i);
1407  }
1408  sortCache(temp_sorted_cache);
1409  mergeSortedCache(temp_sorted_cache);
1410 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1233 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1234  {
1235  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1236  if (offset_file_off + write_length >= offset_file_size_) {
1237  const size_t min_capacity_needed =
1238  write_length - (offset_file_size_ - offset_file_off);
1239  if (!isTemp_) {
1240  CHECK_GE(offset_fd_, 0);
1242  addOffsetCapacity(min_capacity_needed);
1243  CHECK(offset_file_off + write_length <= offset_file_size_);
1244  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1246  } else {
1247  addOffsetCapacity(min_capacity_needed);
1248  CHECK(offset_file_off + write_length <= offset_file_size_);
1249  }
1250  }
1251 }
StringIdxEntry * offset_map_
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:210
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1214 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1215  {
1216  if (payload_file_off_ + write_length > payload_file_size_) {
1217  const size_t min_capacity_needed =
1218  write_length - (payload_file_size_ - payload_file_off_);
1219  if (!isTemp_) {
1220  CHECK_GE(payload_fd_, 0);
1222  addPayloadCapacity(min_capacity_needed);
1223  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1224  payload_map_ =
1225  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1226  } else {
1227  addPayloadCapacity(min_capacity_needed);
1228  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1229  }
1230  }
1231 }
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
#define CHECK_GE(x, y)
Definition: Logger.h:210
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1382 of file StringDictionary.cpp.

References CHECK, client_, omnisci::fsync(), isTemp_, omnisci::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1382  {
1383  if (client_) {
1384  try {
1385  return client_->checkpoint();
1386  } catch (...) {
1387  return false;
1388  }
1389  }
1390  CHECK(!isTemp_);
1391  bool ret = true;
1392  ret = ret &&
1393  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1394  ret = ret &&
1395  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1396  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1397  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1398  return ret;
1399 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1116 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1119  {
1120  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1121  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1122  while (true) {
1123  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1124  if (candidate_string_id ==
1125  INVALID_STR_ID) { // In this case it means the slot is available for use
1126  break;
1127  }
1128  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1130  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1131  if (input_string.size() == candidate_string.size() &&
1132  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1133  // found the string
1134  break;
1135  }
1136  }
1137  // wrap around
1138  if (++bucket == string_dict_hash_table_size) {
1139  bucket = 0;
1140  }
1141  }
1142  return bucket;
1143 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1146 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1152  {
1153  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1154  while (true) {
1155  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1156  if (candidate_string_id ==
1157  INVALID_STR_ID) { // In this case it means the slot is available for use
1158  break;
1159  }
1160  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1161  if (candidate_string_id > 0 &&
1162  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1163  // The candidate string is not in storage yet but in our string_memory_ids temp
1164  // buffer
1165  size_t memory_offset =
1166  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1167  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1168  if (input_string.size() == candidate_string.size() &&
1169  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1170  // found the string in the temp memory buffer
1171  break;
1172  }
1173  } else {
1174  // The candidate string is in storage, need to fetch it for comparison
1175  const auto candidate_storage_string =
1176  getStringFromStorageFast(candidate_string_id);
1177  if (input_string.size() == candidate_storage_string.size() &&
1178  !memcmp(input_string.data(),
1179  candidate_storage_string.data(),
1180  input_string.size())) {
1183  // found the string in storage
1184  break;
1185  }
1186  }
1187  }
1188  if (++bucket == string_id_string_dict_hash_table.size()) {
1189  bucket = 0;
1190  }
1191  }
1192  return bucket;
1193 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1195 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1197  {
1198  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1199  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1200  while (true) {
1201  if (string_id_string_dict_hash_table[bucket] ==
1202  INVALID_STR_ID) { // In this case it means the slot is available for use
1203  break;
1204  }
1205  collisions_++;
1206  // wrap around
1207  if (++bucket == string_dict_hash_table_size) {
1208  bucket = 0;
1209  }
1210  }
1211  return bucket;
1212 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 951 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

951  {
952  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
953  if (client_) {
954  // TODO(miyu): support remote string dictionary
955  throw std::runtime_error(
956  "copying dictionaries from remote server is not supported yet.");
957  }
958 
959  if (strings_cache_) {
960  return strings_cache_;
961  }
962 
963  strings_cache_ = std::make_shared<std::vector<std::string>>();
964  strings_cache_->reserve(str_count_);
965  const bool multithreaded = str_count_ > 10000;
966  const auto worker_count =
967  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
968  CHECK_GT(worker_count, 0UL);
969  std::vector<std::vector<std::string>> worker_results(worker_count);
970  auto copy = [this](std::vector<std::string>& str_list,
971  const size_t start_id,
972  const size_t end_id) {
973  CHECK_LE(start_id, end_id);
974  str_list.reserve(end_id - start_id);
975  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
976  str_list.push_back(getStringUnlocked(string_id));
977  }
978  };
979  if (multithreaded) {
980  std::vector<std::future<void>> workers;
981  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
982  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
983  worker_idx < worker_count && start < str_count_;
984  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
985  workers.push_back(std::async(
986  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
987  }
988  for (auto& worker : workers) {
989  worker.get();
990  }
991  } else {
992  CHECK_EQ(worker_results.size(), size_t(1));
993  copy(worker_results[0], 0, str_count_);
994  }
995 
996  for (const auto& worker_result : worker_results) {
997  strings_cache_->insert(
998  strings_cache_->end(), worker_result.begin(), worker_result.end());
999  }
1000  return strings_cache_;
1001 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1003 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1003  {
1004  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1005 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 743 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), generate_TableFunctionsFactory_init::i, gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

745  {
746  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
747  if (client_) {
748  return client_->get_compare(pattern, comp_operator, generation);
749  }
750  std::vector<int32_t> ret;
751  if (str_count_ == 0) {
752  return ret;
753  }
754  if (sorted_cache.size() < str_count_) {
755  if (comp_operator == "=" || comp_operator == "<>") {
756  return getEquals(pattern, comp_operator, generation);
757  }
758 
760  }
761  auto cache_index = compare_cache_.get(pattern);
762 
763  if (!cache_index) {
764  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
765  const auto cache_itr = std::lower_bound(
766  sorted_cache.begin(),
767  sorted_cache.end(),
768  pattern,
769  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
770  auto a_str = this->getStringFromStorage(a);
771  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
772  });
773 
774  if (cache_itr == sorted_cache.end()) {
775  cache_index->index = sorted_cache.size() - 1;
776  cache_index->diff = 1;
777  } else {
778  const auto cache_str = getStringFromStorage(*cache_itr);
779  if (!string_eq(
780  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
781  cache_index->index = cache_itr - sorted_cache.begin() - 1;
782  cache_index->diff = 1;
783  } else {
784  cache_index->index = cache_itr - sorted_cache.begin();
785  cache_index->diff = 0;
786  }
787  }
788 
789  compare_cache_.put(pattern, cache_index);
790  }
791 
792  // since we have a cache in form of vector of ints which is sorted according to
793  // corresponding strings in the dictionary all we need is the index of the element
794  // which equal to the pattern that we are trying to match or the index of “biggest”
795  // element smaller than the pattern, to perform all the comparison operators over
796  // string. The search function guarantees we have such index so now it is just the
797  // matter to include all the elements in the result vector.
798 
799  // For < operator if the index that we have points to the element which is equal to
800  // the pattern that we are searching for we simply get all the elements less than the
801  // index. If the element pointed by the index is not equal to the pattern we are
802  // comparing with we also need to include that index in result vector, except when the
803  // index points to 0 and the pattern is lesser than the smallest value in the string
804  // dictionary.
805 
806  if (comp_operator == "<") {
807  size_t idx = cache_index->index;
808  if (cache_index->diff) {
809  idx = cache_index->index + 1;
810  if (cache_index->index == 0 && cache_index->diff > 0) {
811  idx = cache_index->index;
812  }
813  }
814  for (size_t i = 0; i < idx; i++) {
815  ret.push_back(sorted_cache[i]);
816  }
817 
818  // For <= operator if the index that we have points to the element which is equal to
819  // the pattern that we are searching for we want to include the element pointed by
820  // the index in the result set. If the element pointed by the index is not equal to
821  // the pattern we are comparing with we just want to include all the ids with index
822  // less than the index that is cached, except when pattern that we are searching for
823  // is smaller than the smallest string in the dictionary.
824 
825  } else if (comp_operator == "<=") {
826  size_t idx = cache_index->index + 1;
827  if (cache_index == 0 && cache_index->diff > 0) {
828  idx = cache_index->index;
829  }
830  for (size_t i = 0; i < idx; i++) {
831  ret.push_back(sorted_cache[i]);
832  }
833 
834  // For > operator we want to get all the elements with index greater than the index
835  // that we have except, when the pattern we are searching for is lesser than the
836  // smallest string in the dictionary we also want to include the id of the index
837  // that we have.
838 
839  } else if (comp_operator == ">") {
840  size_t idx = cache_index->index + 1;
841  if (cache_index->index == 0 && cache_index->diff > 0) {
842  idx = cache_index->index;
843  }
844  for (size_t i = idx; i < sorted_cache.size(); i++) {
845  ret.push_back(sorted_cache[i]);
846  }
847 
848  // For >= operator when the indexed element that we have points to element which is
849  // equal to the pattern we are searching for we want to include that in the result
850  // vector. If the index that we have does not point to the string which is equal to
851  // the pattern we are searching we don’t want to include that id into the result
852  // vector except when the index is 0.
853 
854  } else if (comp_operator == ">=") {
855  size_t idx = cache_index->index;
856  if (cache_index->diff) {
857  idx = cache_index->index + 1;
858  if (cache_index->index == 0 && cache_index->diff > 0) {
859  idx = cache_index->index;
860  }
861  }
862  for (size_t i = idx; i < sorted_cache.size(); i++) {
863  ret.push_back(sorted_cache[i]);
864  }
865  } else if (comp_operator == "=") {
866  if (!cache_index->diff) {
867  ret.push_back(sorted_cache[cache_index->index]);
868  }
869 
870  // For <> operator it is simple matter of not including id of string which is equal
871  // to pattern we are searching for.
872  } else if (comp_operator == "<>") {
873  if (!cache_index->diff) {
874  size_t idx = cache_index->index;
875  for (size_t i = 0; i < idx; i++) {
876  ret.push_back(sorted_cache[i]);
877  }
878  ++idx;
879  for (size_t i = idx; i < sorted_cache.size(); i++) {
880  ret.push_back(sorted_cache[i]);
881  }
882  } else {
883  for (size_t i = 0; i < sorted_cache.size(); i++) {
884  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
885  }
886  }
887 
888  } else {
889  std::runtime_error("Unsupported string comparison operator");
890  }
891  return ret;
892 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 683 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

685  {
686  std::vector<int32_t> result;
687  auto eq_id_itr = equal_cache_.find(pattern);
688  int32_t eq_id = MAX_STRLEN + 1;
689  int32_t cur_size = str_count_;
690  if (eq_id_itr != equal_cache_.end()) {
691  auto eq_id = eq_id_itr->second;
692  if (comp_operator == "=") {
693  result.push_back(eq_id);
694  } else {
695  for (int32_t idx = 0; idx <= cur_size; idx++) {
696  if (idx == eq_id) {
697  continue;
698  }
699  result.push_back(idx);
700  }
701  }
702  } else {
703  std::vector<std::thread> workers;
704  int worker_count = cpu_threads();
705  CHECK_GT(worker_count, 0);
706  std::vector<std::vector<int32_t>> worker_results(worker_count);
707  CHECK_LE(generation, str_count_);
708  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
709  workers.emplace_back(
710  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
711  for (size_t string_id = worker_idx; string_id < generation;
712  string_id += worker_count) {
713  const auto str = getStringUnlocked(string_id);
714  if (str == pattern) {
715  worker_results[worker_idx].push_back(string_id);
716  }
717  }
718  });
719  }
720  for (auto& worker : workers) {
721  worker.join();
722  }
723  for (const auto& worker_result : worker_results) {
724  result.insert(result.end(), worker_result.begin(), worker_result.end());
725  }
726  if (result.size() > 0) {
727  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
728  CHECK(it_ok.second);
729  eq_id = result[0];
730  }
731  if (comp_operator == "<>") {
732  for (int32_t idx = 0; idx <= cur_size; idx++) {
733  if (idx == eq_id) {
734  continue;
735  }
736  result.push_back(idx);
737  }
738  }
739  }
740  return result;
741 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 557 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

557  {
558  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
559  if (client_) {
560  return client_->get(str);
561  }
562  return getUnlocked(str);
563 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 630 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

634  {
635  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
636  if (client_) {
637  return client_->get_like(pattern, icase, is_simple, escape, generation);
638  }
639  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
640  const auto it = like_cache_.find(cache_key);
641  if (it != like_cache_.end()) {
642  return it->second;
643  }
644  std::vector<int32_t> result;
645  std::vector<std::thread> workers;
646  int worker_count = cpu_threads();
647  CHECK_GT(worker_count, 0);
648  std::vector<std::vector<int32_t>> worker_results(worker_count);
649  CHECK_LE(generation, str_count_);
650  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
651  workers.emplace_back([&worker_results,
652  &pattern,
653  generation,
654  icase,
655  is_simple,
656  escape,
657  worker_idx,
658  worker_count,
659  this]() {
660  for (size_t string_id = worker_idx; string_id < generation;
661  string_id += worker_count) {
662  const auto str = getStringUnlocked(string_id);
663  if (is_like(str, pattern, icase, is_simple, escape)) {
664  worker_results[worker_idx].push_back(string_id);
665  }
666  }
667  });
668  }
669  for (auto& worker : workers) {
670  worker.join();
671  }
672  for (const auto& worker_result : worker_results) {
673  result.insert(result.end(), worker_result.begin(), worker_result.end());
674  }
675  // place result into cache for reuse if similar query
676  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
677 
678  CHECK(it_ok.second);
679 
680  return result;
681 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 240 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

241  {
242  if (storage_slots == 0) {
243  return 0;
244  }
245  // Must use signed integers since final binary search step can wrap to max size_t value
246  // if dictionary is empty
247  int64_t min_bound = 0;
248  int64_t max_bound = storage_slots - 1;
249  int64_t guess{0};
250  while (min_bound <= max_bound) {
251  guess = (max_bound + min_bound) / 2;
252  CHECK_GE(guess, 0);
253  if (getStringFromStorage(guess).canary) {
254  max_bound = guess - 1;
255  } else {
256  min_bound = guess + 1;
257  }
258  }
259  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
260  return guess + (min_bound > guess ? 1 : 0);
261 }
#define CHECK_GE(x, y)
Definition: Logger.h:210
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 290 of file StringDictionary.cpp.

References CHECK_EQ.

290  {
291  if (client_) {
292  std::vector<int32_t> string_ids;
293  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
294  CHECK_EQ(size_t(1), string_ids.size());
295  return string_ids.front();
296  }
297  return getOrAddImpl(str);
298 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 351 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

352  {
354  getOrAddBulkParallel(input_strings, output_string_ids);
355  return;
356  }
357  // Single-thread path.
358  if (client_no_timeout_) {
359  getOrAddBulkRemote(input_strings, output_string_ids);
360  return;
361  }
362  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
363 
364  const size_t initial_str_count = str_count_;
365  size_t idx = 0;
366  for (const auto& input_string : input_strings) {
367  if (input_string.empty()) {
368  output_string_ids[idx++] = inline_int_null_value<T>();
369  continue;
370  }
371  CHECK(input_string.size() <= MAX_STRLEN);
372 
373  const string_dict_hash_t input_string_hash = hash_string(input_string);
374  uint32_t hash_bucket =
375  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
377  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
378  continue;
379  }
380  // need to add record to dictionary
381  // check there is room
382  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
383  log_encoding_error<T>(input_string);
384  output_string_ids[idx++] = inline_int_null_value<T>();
385  continue;
386  }
388  << "Maximum number (" << str_count_
389  << ") of Dictionary encoded Strings reached for this column, offset path "
390  "for column is "
391  << offsets_path_;
392  if (fillRateIsHigh(str_count_)) {
393  // resize when more than 50% is full
395  hash_bucket = computeBucket(
396  input_string_hash, input_string, string_id_string_dict_hash_table_);
397  }
398  appendToStorage(input_string);
399 
400  if (materialize_hashes_) {
401  hash_cache_[str_count_] = input_string_hash;
402  }
403  const int32_t string_id = static_cast<int32_t>(str_count_);
404  string_id_string_dict_hash_table_[hash_bucket] = string_id;
405  output_string_ids[idx++] = string_id;
406  ++str_count_;
407  }
408  const size_t num_strings_added = str_count_ - initial_str_count;
409  if (num_strings_added > 0) {
411  }
412 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 312 of file StringDictionary.cpp.

References getOrAddBulk(), and generate_TableFunctionsFactory_init::i.

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

314  {
315  ids_array_vec.resize(string_array_vec.size());
316  for (size_t i = 0; i < string_array_vec.size(); i++) {
317  auto& strings = string_array_vec[i];
318  auto& ids = ids_array_vec[i];
319  ids.resize(strings.size());
320  getOrAddBulk(strings, &ids[0]);
321  }
322 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 415 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), getOrAddBulkRemote(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

416  {
417  if (client_no_timeout_) {
418  getOrAddBulkRemote(input_strings, output_string_ids);
419  return;
420  }
421  // Compute hashes of the input strings up front, and in parallel,
422  // as the string hashing does not need to be behind the subsequent write_lock
423  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
424  hashStrings(input_strings, input_strings_hashes);
425 
426  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
427  size_t shadow_str_count =
428  str_count_; // Need to shadow str_count_ now with bulk add methods
429  const size_t storage_high_water_mark = shadow_str_count;
430  std::vector<size_t> string_memory_ids;
431  size_t sum_new_string_lengths = 0;
432  string_memory_ids.reserve(input_strings.size());
433  size_t input_string_idx{0};
434  for (const auto& input_string : input_strings) {
435  // Currently we make empty strings null
436  if (input_string.empty()) {
437  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
438  continue;
439  }
440  // TODO: Recover gracefully if an input string is too long
441  CHECK(input_string.size() <= MAX_STRLEN);
442 
443  if (fillRateIsHigh(shadow_str_count)) {
444  // resize when more than 50% is full
446  storage_high_water_mark,
447  input_strings,
448  string_memory_ids,
449  input_strings_hashes);
450  }
451  // Compute the hash for this input_string
452  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
453 
454  const uint32_t hash_bucket =
455  computeBucketFromStorageAndMemory(input_string_hash,
456  input_string,
458  storage_high_water_mark,
459  input_strings,
460  string_memory_ids);
461 
462  // If the hash bucket is not empty, that is our string id
463  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
464  // bucket string are equal)
466  output_string_ids[input_string_idx++] =
468  continue;
469  }
470  // Did not find string, so need to add record to dictionary
471  // First check there is room
472  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
473  log_encoding_error<T>(input_string);
474  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
475  continue;
476  }
477  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
478  << "Maximum number (" << shadow_str_count
479  << ") of Dictionary encoded Strings reached for this column, offset path "
480  "for column is "
481  << offsets_path_;
482 
483  string_memory_ids.push_back(input_string_idx);
484  sum_new_string_lengths += input_string.size();
485  string_id_string_dict_hash_table_[hash_bucket] =
486  static_cast<int32_t>(shadow_str_count);
487  if (materialize_hashes_) {
488  hash_cache_[shadow_str_count] = input_string_hash;
489  }
490  output_string_ids[input_string_idx++] = shadow_str_count++;
491  }
492  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
493  const size_t num_strings_added = shadow_str_count - str_count_;
494  str_count_ = shadow_str_count;
495  if (num_strings_added > 0) {
497  }
498 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 517 of file StringDictionary.cpp.

References CHECK, client_no_timeout_, and generate_TableFunctionsFactory_init::i.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

518  {
520  std::vector<int32_t> string_ids;
521  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
522  size_t out_idx{0};
523  for (size_t i = 0; i < string_ids.size(); ++i) {
524  const auto string_id = string_ids[i];
525  const bool invalid = string_id > max_valid_int_value<T>();
526  if (invalid || string_id == inline_int_null_value<int32_t>()) {
527  if (invalid) {
528  log_encoding_error<T>(string_vec[i]);
529  }
530  encoded_vec[out_idx++] = inline_int_null_value<T>();
531  continue;
532  }
533  encoded_vec[out_idx++] = string_id;
534  }
535 }
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1063 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1063  {
1064  // @TODO(wei) treat empty string as NULL for now
1065  if (str.size() == 0) {
1066  return inline_int_null_value<int32_t>();
1067  }
1068  CHECK(str.size() <= MAX_STRLEN);
1069  const string_dict_hash_t hash = hash_string(str);
1070  {
1071  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1072  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1074  return string_id_string_dict_hash_table_[bucket];
1075  }
1076  }
1077  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1078  if (fillRateIsHigh(str_count_)) {
1079  // resize when more than 50% is full
1081  }
1082  // need to recalculate the bucket in case it changed before
1083  // we got the lock
1084  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1087  << "Maximum number (" << str_count_
1088  << ") of Dictionary encoded Strings reached for this column, offset path "
1089  "for column is "
1090  << offsets_path_;
1091  appendToStorage(str);
1092  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1093  if (materialize_hashes_) {
1094  hash_cache_[str_count_] = hash;
1095  }
1096  ++str_count_;
1098  }
1099  return string_id_string_dict_hash_table_[bucket];
1100 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 904 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

906  {
907  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
908  if (client_) {
909  return client_->get_regexp_like(pattern, escape, generation);
910  }
911  const auto cache_key = std::make_pair(pattern, escape);
912  const auto it = regex_cache_.find(cache_key);
913  if (it != regex_cache_.end()) {
914  return it->second;
915  }
916  std::vector<int32_t> result;
917  std::vector<std::thread> workers;
918  int worker_count = cpu_threads();
919  CHECK_GT(worker_count, 0);
920  std::vector<std::vector<int32_t>> worker_results(worker_count);
921  CHECK_LE(generation, str_count_);
922  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
923  workers.emplace_back([&worker_results,
924  &pattern,
925  generation,
926  escape,
927  worker_idx,
928  worker_count,
929  this]() {
930  for (size_t string_id = worker_idx; string_id < generation;
931  string_id += worker_count) {
932  const auto str = getStringUnlocked(string_id);
933  if (is_regexp_like(str, pattern, escape)) {
934  worker_results[worker_idx].push_back(string_id);
935  }
936  }
937  });
938  }
939  for (auto& worker : workers) {
940  worker.join();
941  }
942  for (const auto& worker_result : worker_results) {
943  result.insert(result.end(), worker_result.begin(), worker_result.end());
944  }
945  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
946  CHECK(it_ok.second);
947 
948  return result;
949 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 572 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

572  {
573  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
574  if (client_) {
575  std::string ret;
576  client_->get_string(ret, string_id);
577  return ret;
578  }
579  return getStringUnlocked(string_id);
580 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 587 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

588  {
589  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
590  CHECK(!client_);
591  CHECK_LE(0, string_id);
592  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
593  return getStringBytesChecked(string_id);
594 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1108 of file StringDictionary.cpp.

References CHECK.

1109  {
1110  const auto str_canary = getStringFromStorage(string_id);
1111  CHECK(!str_canary.canary);
1112  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1113 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1102 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1102  {
1103  const auto str_canary = getStringFromStorage(string_id);
1104  CHECK(!str_canary.canary);
1105  return std::string(str_canary.c_str_ptr, str_canary.size);
1106 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1294 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1295  {
1296  if (!isTemp_) {
1297  CHECK_GE(payload_fd_, 0);
1298  CHECK_GE(offset_fd_, 0);
1299  }
1300  CHECK_GE(string_id, 0);
1301  const StringIdxEntry* str_meta = offset_map_ + string_id;
1302  if (str_meta->size == 0xffff) {
1303  // hit the canary
1304  return {nullptr, 0, true};
1305  }
1306  return {payload_map_ + str_meta->off, str_meta->size, false};
1307 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1288 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1289  {
1290  const StringIdxEntry* str_meta = offset_map_ + string_id;
1291  return {payload_map_ + str_meta->off, str_meta->size};
1292 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 582 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

582  {
583  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
584  return getStringChecked(string_id);
585 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 565 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

565  {
566  const string_dict_hash_t hash = hash_string(str);
569  return str_id;
570 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 334 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and generate_TableFunctionsFactory_init::r.

Referenced by getOrAddBulkParallel().

336  {
337  CHECK_EQ(string_vec.size(), hashes.size());
338 
339  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
340  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
341  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
342  if (string_vec[curr_id].empty()) {
343  continue;
344  }
345  hashes[curr_id] = hash_string(string_vec[curr_id]);
346  }
347  });
348 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
string_dict_hash_t hash_string(const std::string_view &str)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1007 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), generate_TableFunctionsFactory_init::i, INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1007  {
1008  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1009  INVALID_STR_ID);
1010 
1011  if (materialize_hashes_) {
1012  for (size_t i = 0; i != str_count_; ++i) {
1013  const string_dict_hash_t hash = hash_cache_[i];
1014  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1015  new_str_ids[bucket] = i;
1016  }
1017  hash_cache_.resize(hash_cache_.size() * 2);
1018  } else {
1019  for (size_t i = 0; i != str_count_; ++i) {
1020  const auto str = getStringChecked(i);
1021  const string_dict_hash_t hash = hash_string(str);
1022  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1023  new_str_ids[bucket] = i;
1024  }
1025  }
1026  string_id_string_dict_hash_table_.swap(new_str_ids);
1027 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1030 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string(), and generate_TableFunctionsFactory_init::i.

Referenced by getOrAddBulkParallel().

1036  {
1037  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1038  INVALID_STR_ID);
1039  if (materialize_hashes_) {
1040  for (size_t i = 0; i != str_count; ++i) {
1041  const string_dict_hash_t hash = hash_cache_[i];
1042  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1043  new_str_ids[bucket] = i;
1044  }
1045  hash_cache_.resize(hash_cache_.size() * 2);
1046  } else {
1047  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1048  const auto storage_string = getStringChecked(storage_idx);
1049  const string_dict_hash_t hash = hash_string(storage_string);
1050  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1051  new_str_ids[bucket] = storage_idx;
1052  }
1053  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1054  const size_t string_memory_id = string_memory_ids[memory_idx];
1055  const uint32_t bucket = computeUniqueBucketWithHash(
1056  input_strings_hashes[string_memory_id], new_str_ids);
1057  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1058  }
1059  }
1060  string_id_string_dict_hash_table_.swap(new_str_ids);
1061 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1369 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, regex_cache_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1369  {
1370  if (!like_cache_.empty()) {
1371  decltype(like_cache_)().swap(like_cache_);
1372  }
1373  if (!regex_cache_.empty()) {
1374  decltype(regex_cache_)().swap(regex_cache_);
1375  }
1376  if (!equal_cache_.empty()) {
1377  decltype(equal_cache_)().swap(equal_cache_);
1378  }
1379  compare_cache_.invalidateInvertedIndex();
1380 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1425 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1425  {
1426  // this method is not thread safe
1427  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1428  size_t t_idx = 0, s_idx = 0, idx = 0;
1429  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1430  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1431  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1432  const auto insert_from_temp_cache =
1433  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1434  if (insert_from_temp_cache) {
1435  updated_cache[idx] = temp_sorted_cache[t_idx++];
1436  } else {
1437  updated_cache[idx] = sorted_cache[s_idx++];
1438  }
1439  }
1440  while (t_idx < temp_sorted_cache.size()) {
1441  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1442  }
1443  while (s_idx < sorted_cache.size()) {
1444  updated_cache[idx++] = sorted_cache[s_idx++];
1445  }
1446  sorted_cache.swap(updated_cache);
1447 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1476 of file StringDictionary.cpp.

References generate_TableFunctionsFactory_init::i, populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1480  {
1481  dest_array_ids.resize(source_array_ids.size());
1482 
1483  std::atomic<size_t> row_idx{0};
1484  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1485  int thread_id) {
1486  for (;;) {
1487  auto row = row_idx.fetch_add(1);
1488 
1489  if (row >= dest_array_ids.size()) {
1490  return;
1491  }
1492  const auto& source_ids = source_array_ids[row];
1493  auto& dest_ids = dest_array_ids[row];
1494  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1495  }
1496  };
1497 
1498  const int num_worker_threads = std::thread::hardware_concurrency();
1499 
1500  if (source_array_ids.size() / num_worker_threads > 10) {
1501  std::vector<std::future<void>> worker_threads;
1502  for (int i = 0; i < num_worker_threads; ++i) {
1503  worker_threads.push_back(std::async(std::launch::async, processor, i));
1504  }
1505 
1506  for (auto& child : worker_threads) {
1507  child.wait();
1508  }
1509  for (auto& child : worker_threads) {
1510  child.get();
1511  }
1512  } else {
1513  processor(0);
1514  }
1515 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:732

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1449 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1454  {
1455  std::vector<std::string> strings;
1456 
1457  for (const int32_t source_id : source_ids) {
1458  if (source_id == std::numeric_limits<int32_t>::min()) {
1459  strings.emplace_back("");
1460  } else if (source_id < 0) {
1461  if (auto string_itr = transient_mapping.find(source_id);
1462  string_itr != transient_mapping.end()) {
1463  strings.emplace_back(string_itr->second);
1464  } else {
1465  throw std::runtime_error("Unexpected negative source ID");
1466  }
1467  } else {
1468  strings.push_back(source_dict->getString(source_id));
1469  }
1470  }
1471 
1472  dest_ids.resize(strings.size());
1473  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1474 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 213 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

215  {
216  for (auto& dictionary_future : dictionary_futures) {
217  dictionary_future.wait();
218  const auto hashVec = dictionary_future.get();
219  for (const auto& hash : hashVec) {
220  const uint32_t bucket =
222  payload_file_off_ += hash.second;
223  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
224  if (materialize_hashes_) {
225  hash_cache_[str_count_] = hash.first;
226  }
227  ++str_count_;
228  }
229  }
230  dictionary_futures.clear();
231 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1412 of file StringDictionary.cpp.

References getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1412  {
1413  // This method is not thread-safe.
1414 
1415  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1416  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1417 
1418  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1419  auto a_str = this->getStringFromStorage(a);
1420  auto b_str = this->getStringFromStorage(b);
1421  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1422  });
1423 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 596 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

596  {
597  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
598  if (client_) {
599  return client_->storage_entry_count();
600  }
601  return str_count_;
602 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 231 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 232 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 229 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

size_t StringDictionary::collisions_
private

Definition at line 207 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 226 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 225 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

std::vector<string_dict_hash_t> StringDictionary::hash_cache_
private
std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 223 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 117 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 213 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 224 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 210 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 227 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: