OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

const std::string folder_
 
size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 44 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 95 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, omnisci::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, omnisci::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

100  : folder_(folder)
101  , str_count_(0)
103  , hash_cache_(initial_capacity)
104  , isTemp_(isTemp)
105  , materialize_hashes_(materializeHashes)
106  , payload_fd_(-1)
107  , offset_fd_(-1)
108  , offset_map_(nullptr)
109  , payload_map_(nullptr)
110  , offset_file_size_(0)
111  , payload_file_size_(0)
112  , payload_file_off_(0)
113  , strings_cache_(nullptr) {
114  if (!isTemp && folder.empty()) {
115  return;
116  }
117 
118  // initial capacity must be a power of two for efficient bucket computation
119  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
120  if (!isTemp_) {
121  boost::filesystem::path storage_path(folder);
122  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
123  const auto payload_path =
124  (storage_path / boost::filesystem::path("DictPayload")).string();
125  payload_fd_ = checked_open(payload_path.c_str(), recover);
126  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
129  }
130  bool storage_is_empty = false;
131  if (payload_file_size_ == 0) {
132  storage_is_empty = true;
134  }
135  if (offset_file_size_ == 0) {
137  }
138  if (!isTemp_) { // we never mmap or recover temp dictionaries
139  payload_map_ =
140  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
141  offset_map_ = reinterpret_cast<StringIdxEntry*>(
143  if (recover) {
144  const size_t bytes = omnisci::file_size(offset_fd_);
145  if (bytes % sizeof(StringIdxEntry) != 0) {
146  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
147  }
148  const uint64_t str_count =
149  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
150  collisions_ = 0;
151  // at this point we know the size of the StringDict we need to load
152  // so lets reallocate the vector to the correct size
153  const uint64_t max_entries =
154  std::max(round_up_p2(str_count * 2 + 1),
155  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
156  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
157  string_id_string_dict_hash_table_.swap(new_str_ids);
158  if (materialize_hashes_) {
159  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
160  hash_cache_.swap(new_hash_cache);
161  }
162  // Bail early if we know we don't have strings to add (i.e. a new or empty
163  // dictionary)
164  if (str_count == 0) {
165  return;
166  }
167 
168  unsigned string_id = 0;
169  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
170 
171  uint32_t thread_inits = 0;
172  const auto thread_count = std::thread::hardware_concurrency();
173  const uint32_t items_per_thread = std::max<uint32_t>(
174  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
175  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
176  dictionary_futures;
177  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
178  dictionary_futures.emplace_back(std::async(
179  std::launch::async, [string_id, str_count, items_per_thread, this] {
180  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
181  for (uint32_t curr_id = string_id;
182  curr_id < string_id + items_per_thread && curr_id < str_count;
183  curr_id++) {
184  const auto recovered = getStringFromStorage(curr_id);
185  if (recovered.canary) {
186  // hit the canary, recovery finished
187  break;
188  } else {
189  std::string_view temp(recovered.c_str_ptr, recovered.size);
190  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
191  }
192  }
193  return hashVec;
194  }));
195  thread_inits++;
196  if (thread_inits % thread_count == 0) {
197  processDictionaryFutures(dictionary_futures);
198  }
199  }
200  // gather last few threads
201  if (dictionary_futures.size() != 0) {
202  processDictionaryFutures(dictionary_futures);
203  }
204  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
205  << " Hash table size: " << string_id_string_dict_hash_table_.size()
206  << " Fill rate: "
207  << static_cast<double>(str_count_) * 100.0 /
209  << "% Collisions: " << collisions_;
210  }
211  }
212 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:211
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:194
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
#define VLOG(n)
Definition: Logger.h:297
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 264 of file StringDictionary.cpp.

265  : folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
266  std::to_string(dict_ref.dictId))
267  , strings_cache_(nullptr)
268  , client_(new StringDictionaryClient(host, dict_ref, true))
269  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
int32_t dictId
Definition: DictRef.h:10
std::unique_ptr< StringDictionaryClient > client_no_timeout_
int32_t dbId
Definition: DictRef.h:9
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 271 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, omnisci::checked_munmap(), client_, omnisci::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

271  {
272  free(CANARY_BUFFER);
273  if (client_) {
274  return;
275  }
276  if (payload_map_) {
277  if (!isTemp_) {
281  CHECK_GE(payload_fd_, 0);
283  CHECK_GE(offset_fd_, 0);
285  } else {
287  free(payload_map_);
288  free(offset_map_);
289  }
290  }
291 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:216
std::unique_ptr< StringDictionaryClient > client_
void close(const int fd)
Definition: omnisci_fs.cpp:68
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1352 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1354  {
1355  const size_t canary_buff_size_to_add =
1356  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1357  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1358  if (canary_buffer_size < canary_buff_size_to_add) {
1359  CANARY_BUFFER =
1360  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1361  canary_buffer_size = canary_buff_size_to_add;
1363  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1364  }
1365  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1366  CHECK(new_addr);
1367  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1368  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1369  mem_size += canary_buff_size_to_add;
1370  return new_addr;
1371 }
#define CHECK(condition)
Definition: Logger.h:203
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1322 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1322  {
1323  if (!isTemp_) {
1324  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1325  } else {
1326  offset_map_ = static_cast<StringIdxEntry*>(
1327  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1328  }
1329 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1313 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1313  {
1314  if (!isTemp_) {
1315  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1316  } else {
1317  payload_map_ = static_cast<char*>(
1318  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1319  }
1320 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1331 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1333  {
1334  const size_t canary_buff_size_to_add =
1335  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1336  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1337 
1338  if (canary_buffer_size < canary_buff_size_to_add) {
1339  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1340  canary_buffer_size = canary_buff_size_to_add;
1342  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1343  }
1344 
1345  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1346  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1347  CHECK(write_return > 0 &&
1348  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1349  return canary_buff_size_to_add;
1350 }
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
#define CHECK_NE(x, y)
Definition: Logger.h:212
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1258 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1258  {
1259  // write the payload
1261  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1262 
1263  // write the offset and length
1264  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1265  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1266 
1268  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1269 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1272 of file StringDictionary.cpp.

References i.

Referenced by getOrAddBulkParallel().

1275  {
1276  const size_t num_strings = string_memory_ids.size();
1277 
1278  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1279  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1280 
1281  for (size_t i = 0; i < num_strings; ++i) {
1282  const size_t string_idx = string_memory_ids[i];
1283  const String str = input_strings[string_idx];
1284  const size_t str_size(str.size());
1285  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1286  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1287  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1288  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1289  }
1290 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1409 of file StringDictionary.cpp.

References i, mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1409  {
1410  // This method is not thread-safe.
1411  const auto cur_cache_size = sorted_cache.size();
1412  std::vector<int32_t> temp_sorted_cache;
1413  for (size_t i = cur_cache_size; i < str_count_; i++) {
1414  temp_sorted_cache.push_back(i);
1415  }
1416  sortCache(temp_sorted_cache);
1417  mergeSortedCache(temp_sorted_cache);
1418 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1237 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1238  {
1239  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1240  if (offset_file_off + write_length >= offset_file_size_) {
1241  const size_t min_capacity_needed =
1242  write_length - (offset_file_size_ - offset_file_off);
1243  if (!isTemp_) {
1244  CHECK_GE(offset_fd_, 0);
1246  addOffsetCapacity(min_capacity_needed);
1247  CHECK(offset_file_off + write_length <= offset_file_size_);
1248  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1250  } else {
1251  addOffsetCapacity(min_capacity_needed);
1252  CHECK(offset_file_off + write_length <= offset_file_size_);
1253  }
1254  }
1255 }
StringIdxEntry * offset_map_
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:216
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1218 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1219  {
1220  if (payload_file_off_ + write_length > payload_file_size_) {
1221  const size_t min_capacity_needed =
1222  write_length - (payload_file_size_ - payload_file_off_);
1223  if (!isTemp_) {
1224  CHECK_GE(payload_fd_, 0);
1226  addPayloadCapacity(min_capacity_needed);
1227  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1228  payload_map_ =
1229  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1230  } else {
1231  addPayloadCapacity(min_capacity_needed);
1232  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1233  }
1234  }
1235 }
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
#define CHECK_GE(x, y)
Definition: Logger.h:216
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1390 of file StringDictionary.cpp.

References CHECK, client_, omnisci::fsync(), isTemp_, omnisci::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1390  {
1391  if (client_) {
1392  try {
1393  return client_->checkpoint();
1394  } catch (...) {
1395  return false;
1396  }
1397  }
1398  CHECK(!isTemp_);
1399  bool ret = true;
1400  ret = ret &&
1401  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1402  ret = ret &&
1403  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1404  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1405  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1406  return ret;
1407 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1120 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1123  {
1124  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1125  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1126  while (true) {
1127  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1128  if (candidate_string_id ==
1129  INVALID_STR_ID) { // In this case it means the slot is available for use
1130  break;
1131  }
1132  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1134  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1135  if (input_string.size() == candidate_string.size() &&
1136  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1137  // found the string
1138  break;
1139  }
1140  }
1141  // wrap around
1142  if (++bucket == string_dict_hash_table_size) {
1143  bucket = 0;
1144  }
1145  }
1146  return bucket;
1147 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1150 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1156  {
1157  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1158  while (true) {
1159  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1160  if (candidate_string_id ==
1161  INVALID_STR_ID) { // In this case it means the slot is available for use
1162  break;
1163  }
1164  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1165  if (candidate_string_id > 0 &&
1166  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1167  // The candidate string is not in storage yet but in our string_memory_ids temp
1168  // buffer
1169  size_t memory_offset =
1170  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1171  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1172  if (input_string.size() == candidate_string.size() &&
1173  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1174  // found the string in the temp memory buffer
1175  break;
1176  }
1177  } else {
1178  // The candidate string is in storage, need to fetch it for comparison
1179  const auto candidate_storage_string =
1180  getStringFromStorageFast(candidate_string_id);
1181  if (input_string.size() == candidate_storage_string.size() &&
1182  !memcmp(input_string.data(),
1183  candidate_storage_string.data(),
1184  input_string.size())) {
1187  // found the string in storage
1188  break;
1189  }
1190  }
1191  }
1192  if (++bucket == string_id_string_dict_hash_table.size()) {
1193  bucket = 0;
1194  }
1195  }
1196  return bucket;
1197 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1199 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1201  {
1202  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1203  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1204  while (true) {
1205  if (string_id_string_dict_hash_table[bucket] ==
1206  INVALID_STR_ID) { // In this case it means the slot is available for use
1207  break;
1208  }
1209  collisions_++;
1210  // wrap around
1211  if (++bucket == string_dict_hash_table_size) {
1212  bucket = 0;
1213  }
1214  }
1215  return bucket;
1216 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 955 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

955  {
956  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
957  if (client_) {
958  // TODO(miyu): support remote string dictionary
959  throw std::runtime_error(
960  "copying dictionaries from remote server is not supported yet.");
961  }
962 
963  if (strings_cache_) {
964  return strings_cache_;
965  }
966 
967  strings_cache_ = std::make_shared<std::vector<std::string>>();
968  strings_cache_->reserve(str_count_);
969  const bool multithreaded = str_count_ > 10000;
970  const auto worker_count =
971  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
972  CHECK_GT(worker_count, 0UL);
973  std::vector<std::vector<std::string>> worker_results(worker_count);
974  auto copy = [this](std::vector<std::string>& str_list,
975  const size_t start_id,
976  const size_t end_id) {
977  CHECK_LE(start_id, end_id);
978  str_list.reserve(end_id - start_id);
979  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
980  str_list.push_back(getStringUnlocked(string_id));
981  }
982  };
983  if (multithreaded) {
984  std::vector<std::future<void>> workers;
985  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
986  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
987  worker_idx < worker_count && start < str_count_;
988  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
989  workers.push_back(std::async(
990  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
991  }
992  for (auto& worker : workers) {
993  worker.get();
994  }
995  } else {
996  CHECK_EQ(worker_results.size(), size_t(1));
997  copy(worker_results[0], 0, str_count_);
998  }
999 
1000  for (const auto& worker_result : worker_results) {
1001  strings_cache_->insert(
1002  strings_cache_->end(), worker_result.begin(), worker_result.end());
1003  }
1004  return strings_cache_;
1005 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:214
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1007 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1007  {
1008  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1009 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 747 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), i, gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

749  {
750  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
751  if (client_) {
752  return client_->get_compare(pattern, comp_operator, generation);
753  }
754  std::vector<int32_t> ret;
755  if (str_count_ == 0) {
756  return ret;
757  }
758  if (sorted_cache.size() < str_count_) {
759  if (comp_operator == "=" || comp_operator == "<>") {
760  return getEquals(pattern, comp_operator, generation);
761  }
762 
764  }
765  auto cache_index = compare_cache_.get(pattern);
766 
767  if (!cache_index) {
768  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
769  const auto cache_itr = std::lower_bound(
770  sorted_cache.begin(),
771  sorted_cache.end(),
772  pattern,
773  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
774  auto a_str = this->getStringFromStorage(a);
775  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
776  });
777 
778  if (cache_itr == sorted_cache.end()) {
779  cache_index->index = sorted_cache.size() - 1;
780  cache_index->diff = 1;
781  } else {
782  const auto cache_str = getStringFromStorage(*cache_itr);
783  if (!string_eq(
784  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
785  cache_index->index = cache_itr - sorted_cache.begin() - 1;
786  cache_index->diff = 1;
787  } else {
788  cache_index->index = cache_itr - sorted_cache.begin();
789  cache_index->diff = 0;
790  }
791  }
792 
793  compare_cache_.put(pattern, cache_index);
794  }
795 
796  // since we have a cache in form of vector of ints which is sorted according to
797  // corresponding strings in the dictionary all we need is the index of the element
798  // which equal to the pattern that we are trying to match or the index of “biggest”
799  // element smaller than the pattern, to perform all the comparison operators over
800  // string. The search function guarantees we have such index so now it is just the
801  // matter to include all the elements in the result vector.
802 
803  // For < operator if the index that we have points to the element which is equal to
804  // the pattern that we are searching for we simply get all the elements less than the
805  // index. If the element pointed by the index is not equal to the pattern we are
806  // comparing with we also need to include that index in result vector, except when the
807  // index points to 0 and the pattern is lesser than the smallest value in the string
808  // dictionary.
809 
810  if (comp_operator == "<") {
811  size_t idx = cache_index->index;
812  if (cache_index->diff) {
813  idx = cache_index->index + 1;
814  if (cache_index->index == 0 && cache_index->diff > 0) {
815  idx = cache_index->index;
816  }
817  }
818  for (size_t i = 0; i < idx; i++) {
819  ret.push_back(sorted_cache[i]);
820  }
821 
822  // For <= operator if the index that we have points to the element which is equal to
823  // the pattern that we are searching for we want to include the element pointed by
824  // the index in the result set. If the element pointed by the index is not equal to
825  // the pattern we are comparing with we just want to include all the ids with index
826  // less than the index that is cached, except when pattern that we are searching for
827  // is smaller than the smallest string in the dictionary.
828 
829  } else if (comp_operator == "<=") {
830  size_t idx = cache_index->index + 1;
831  if (cache_index == 0 && cache_index->diff > 0) {
832  idx = cache_index->index;
833  }
834  for (size_t i = 0; i < idx; i++) {
835  ret.push_back(sorted_cache[i]);
836  }
837 
838  // For > operator we want to get all the elements with index greater than the index
839  // that we have except, when the pattern we are searching for is lesser than the
840  // smallest string in the dictionary we also want to include the id of the index
841  // that we have.
842 
843  } else if (comp_operator == ">") {
844  size_t idx = cache_index->index + 1;
845  if (cache_index->index == 0 && cache_index->diff > 0) {
846  idx = cache_index->index;
847  }
848  for (size_t i = idx; i < sorted_cache.size(); i++) {
849  ret.push_back(sorted_cache[i]);
850  }
851 
852  // For >= operator when the indexed element that we have points to element which is
853  // equal to the pattern we are searching for we want to include that in the result
854  // vector. If the index that we have does not point to the string which is equal to
855  // the pattern we are searching we don’t want to include that id into the result
856  // vector except when the index is 0.
857 
858  } else if (comp_operator == ">=") {
859  size_t idx = cache_index->index;
860  if (cache_index->diff) {
861  idx = cache_index->index + 1;
862  if (cache_index->index == 0 && cache_index->diff > 0) {
863  idx = cache_index->index;
864  }
865  }
866  for (size_t i = idx; i < sorted_cache.size(); i++) {
867  ret.push_back(sorted_cache[i]);
868  }
869  } else if (comp_operator == "=") {
870  if (!cache_index->diff) {
871  ret.push_back(sorted_cache[cache_index->index]);
872  }
873 
874  // For <> operator it is simple matter of not including id of string which is equal
875  // to pattern we are searching for.
876  } else if (comp_operator == "<>") {
877  if (!cache_index->diff) {
878  size_t idx = cache_index->index;
879  for (size_t i = 0; i < idx; i++) {
880  ret.push_back(sorted_cache[i]);
881  }
882  ++idx;
883  for (size_t i = idx; i < sorted_cache.size(); i++) {
884  ret.push_back(sorted_cache[i]);
885  }
886  } else {
887  for (size_t i = 0; i < sorted_cache.size(); i++) {
888  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
889  }
890  }
891 
892  } else {
893  std::runtime_error("Unsupported string comparison operator");
894  }
895  return ret;
896 }
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
#define const
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::unique_ptr< StringDictionaryClient > client_
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
mapd_shared_mutex rw_mutex_
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 687 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

689  {
690  std::vector<int32_t> result;
691  auto eq_id_itr = equal_cache_.find(pattern);
692  int32_t eq_id = MAX_STRLEN + 1;
693  int32_t cur_size = str_count_;
694  if (eq_id_itr != equal_cache_.end()) {
695  auto eq_id = eq_id_itr->second;
696  if (comp_operator == "=") {
697  result.push_back(eq_id);
698  } else {
699  for (int32_t idx = 0; idx <= cur_size; idx++) {
700  if (idx == eq_id) {
701  continue;
702  }
703  result.push_back(idx);
704  }
705  }
706  } else {
707  std::vector<std::thread> workers;
708  int worker_count = cpu_threads();
709  CHECK_GT(worker_count, 0);
710  std::vector<std::vector<int32_t>> worker_results(worker_count);
711  CHECK_LE(generation, str_count_);
712  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
713  workers.emplace_back(
714  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
715  for (size_t string_id = worker_idx; string_id < generation;
716  string_id += worker_count) {
717  const auto str = getStringUnlocked(string_id);
718  if (str == pattern) {
719  worker_results[worker_idx].push_back(string_id);
720  }
721  }
722  });
723  }
724  for (auto& worker : workers) {
725  worker.join();
726  }
727  for (const auto& worker_result : worker_results) {
728  result.insert(result.end(), worker_result.begin(), worker_result.end());
729  }
730  if (result.size() > 0) {
731  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
732  CHECK(it_ok.second);
733  eq_id = result[0];
734  }
735  if (comp_operator == "<>") {
736  for (int32_t idx = 0; idx <= cur_size; idx++) {
737  if (idx == eq_id) {
738  continue;
739  }
740  result.push_back(idx);
741  }
742  }
743  }
744  return result;
745 }
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:214
#define CHECK(condition)
Definition: Logger.h:203
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 561 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

561  {
562  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
563  if (client_) {
564  return client_->get(str);
565  }
566  return getUnlocked(str);
567 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 634 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

638  {
639  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
640  if (client_) {
641  return client_->get_like(pattern, icase, is_simple, escape, generation);
642  }
643  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
644  const auto it = like_cache_.find(cache_key);
645  if (it != like_cache_.end()) {
646  return it->second;
647  }
648  std::vector<int32_t> result;
649  std::vector<std::thread> workers;
650  int worker_count = cpu_threads();
651  CHECK_GT(worker_count, 0);
652  std::vector<std::vector<int32_t>> worker_results(worker_count);
653  CHECK_LE(generation, str_count_);
654  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
655  workers.emplace_back([&worker_results,
656  &pattern,
657  generation,
658  icase,
659  is_simple,
660  escape,
661  worker_idx,
662  worker_count,
663  this]() {
664  for (size_t string_id = worker_idx; string_id < generation;
665  string_id += worker_count) {
666  const auto str = getStringUnlocked(string_id);
667  if (is_like(str, pattern, icase, is_simple, escape)) {
668  worker_results[worker_idx].push_back(string_id);
669  }
670  }
671  });
672  }
673  for (auto& worker : workers) {
674  worker.join();
675  }
676  for (const auto& worker_result : worker_results) {
677  result.insert(result.end(), worker_result.begin(), worker_result.end());
678  }
679  // place result into cache for reuse if similar query
680  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
681 
682  CHECK(it_ok.second);
683 
684  return result;
685 }
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:214
#define CHECK(condition)
Definition: Logger.h:203
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 241 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

242  {
243  if (storage_slots == 0) {
244  return 0;
245  }
246  // Must use signed integers since final binary search step can wrap to max size_t value
247  // if dictionary is empty
248  int64_t min_bound = 0;
249  int64_t max_bound = storage_slots - 1;
250  int64_t guess{0};
251  while (min_bound <= max_bound) {
252  guess = (max_bound + min_bound) / 2;
253  CHECK_GE(guess, 0);
254  if (getStringFromStorage(guess).canary) {
255  max_bound = guess - 1;
256  } else {
257  min_bound = guess + 1;
258  }
259  }
260  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
261  return guess + (min_bound > guess ? 1 : 0);
262 }
#define CHECK_GE(x, y)
Definition: Logger.h:216
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 293 of file StringDictionary.cpp.

References CHECK_EQ.

293  {
294  if (client_) {
295  std::vector<int32_t> string_ids;
296  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
297  CHECK_EQ(size_t(1), string_ids.size());
298  return string_ids.front();
299  }
300  return getOrAddImpl(str);
301 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 359 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), folder_, g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

360  {
362  getOrAddBulkParallel(input_strings, output_string_ids);
363  return;
364  }
365  // Single-thread path.
366  if (client_no_timeout_) {
367  getOrAddBulkRemote(input_strings, output_string_ids);
368  return;
369  }
370  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
371 
372  const size_t initial_str_count = str_count_;
373  size_t idx = 0;
374  for (const auto& input_string : input_strings) {
375  if (input_string.empty()) {
376  output_string_ids[idx++] = inline_int_null_value<T>();
377  continue;
378  }
379  CHECK(input_string.size() <= MAX_STRLEN);
380 
381  const string_dict_hash_t input_string_hash = hash_string(input_string);
382  uint32_t hash_bucket =
383  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
385  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
386  continue;
387  }
388  // need to add record to dictionary
389  // check there is room
390  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
391  throw_encoding_error<T>(input_string, folder_);
392  }
394  << "Maximum number (" << str_count_
395  << ") of Dictionary encoded Strings reached for this column, offset path "
396  "for column is "
397  << offsets_path_;
398  if (fillRateIsHigh(str_count_)) {
399  // resize when more than 50% is full
401  hash_bucket = computeBucket(
402  input_string_hash, input_string, string_id_string_dict_hash_table_);
403  }
404  appendToStorage(input_string);
405 
406  if (materialize_hashes_) {
407  hash_cache_[str_count_] = input_string_hash;
408  }
409  const int32_t string_id = static_cast<int32_t>(str_count_);
410  string_id_string_dict_hash_table_[hash_bucket] = string_id;
411  output_string_ids[idx++] = string_id;
412  ++str_count_;
413  }
414  const size_t num_strings_added = str_count_ - initial_str_count;
415  if (num_strings_added > 0) {
417  }
418 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
const std::string folder_
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:213
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:203
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 320 of file StringDictionary.cpp.

References getOrAddBulk(), and i.

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

322  {
323  ids_array_vec.resize(string_array_vec.size());
324  for (size_t i = 0; i < string_array_vec.size(); i++) {
325  auto& strings = string_array_vec[i];
326  auto& ids = ids_array_vec[i];
327  ids.resize(strings.size());
328  getOrAddBulk(strings, &ids[0]);
329  }
330 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 421 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), folder_, getOrAddBulkRemote(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

422  {
423  if (client_no_timeout_) {
424  getOrAddBulkRemote(input_strings, output_string_ids);
425  return;
426  }
427  // Compute hashes of the input strings up front, and in parallel,
428  // as the string hashing does not need to be behind the subsequent write_lock
429  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
430  hashStrings(input_strings, input_strings_hashes);
431 
432  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
433  size_t shadow_str_count =
434  str_count_; // Need to shadow str_count_ now with bulk add methods
435  const size_t storage_high_water_mark = shadow_str_count;
436  std::vector<size_t> string_memory_ids;
437  size_t sum_new_string_lengths = 0;
438  string_memory_ids.reserve(input_strings.size());
439  size_t input_string_idx{0};
440  for (const auto& input_string : input_strings) {
441  // Currently we make empty strings null
442  if (input_string.empty()) {
443  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
444  continue;
445  }
446  // TODO: Recover gracefully if an input string is too long
447  CHECK(input_string.size() <= MAX_STRLEN);
448 
449  if (fillRateIsHigh(shadow_str_count)) {
450  // resize when more than 50% is full
452  storage_high_water_mark,
453  input_strings,
454  string_memory_ids,
455  input_strings_hashes);
456  }
457  // Compute the hash for this input_string
458  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
459 
460  const uint32_t hash_bucket =
461  computeBucketFromStorageAndMemory(input_string_hash,
462  input_string,
464  storage_high_water_mark,
465  input_strings,
466  string_memory_ids);
467 
468  // If the hash bucket is not empty, that is our string id
469  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
470  // bucket string are equal)
472  output_string_ids[input_string_idx++] =
474  continue;
475  }
476  // Did not find string, so need to add record to dictionary
477  // First check there is room
478  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
479  throw_encoding_error<T>(input_string, folder_);
480  }
481  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
482  << "Maximum number (" << shadow_str_count
483  << ") of Dictionary encoded Strings reached for this column, offset path "
484  "for column is "
485  << offsets_path_;
486 
487  string_memory_ids.push_back(input_string_idx);
488  sum_new_string_lengths += input_string.size();
489  string_id_string_dict_hash_table_[hash_bucket] =
490  static_cast<int32_t>(shadow_str_count);
491  if (materialize_hashes_) {
492  hash_cache_[shadow_str_count] = input_string_hash;
493  }
494  output_string_ids[input_string_idx++] = shadow_str_count++;
495  }
496  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
497  const size_t num_strings_added = shadow_str_count - str_count_;
498  str_count_ = shadow_str_count;
499  if (num_strings_added > 0) {
501  }
502 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
const std::string folder_
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:213
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:203
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 521 of file StringDictionary.cpp.

References CHECK, client_no_timeout_, folder_, and i.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

522  {
524  std::vector<int32_t> string_ids;
525  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
526  size_t out_idx{0};
527  for (size_t i = 0; i < string_ids.size(); ++i) {
528  const auto string_id = string_ids[i];
529  const bool invalid = string_id > max_valid_int_value<T>();
530  if (invalid || string_id == inline_int_null_value<int32_t>()) {
531  if (invalid) {
532  throw_encoding_error<T>(string_vec[i], folder_);
533  }
534  encoded_vec[out_idx++] = inline_int_null_value<T>();
535  continue;
536  }
537  encoded_vec[out_idx++] = string_id;
538  }
539 }
const std::string folder_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1067 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1067  {
1068  // @TODO(wei) treat empty string as NULL for now
1069  if (str.size() == 0) {
1070  return inline_int_null_value<int32_t>();
1071  }
1072  CHECK(str.size() <= MAX_STRLEN);
1073  const string_dict_hash_t hash = hash_string(str);
1074  {
1075  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1076  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1078  return string_id_string_dict_hash_table_[bucket];
1079  }
1080  }
1081  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1082  if (fillRateIsHigh(str_count_)) {
1083  // resize when more than 50% is full
1085  }
1086  // need to recalculate the bucket in case it changed before
1087  // we got the lock
1088  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1091  << "Maximum number (" << str_count_
1092  << ") of Dictionary encoded Strings reached for this column, offset path "
1093  "for column is "
1094  << offsets_path_;
1095  appendToStorage(str);
1096  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1097  if (materialize_hashes_) {
1098  hash_cache_[str_count_] = hash;
1099  }
1100  ++str_count_;
1102  }
1103  return string_id_string_dict_hash_table_[bucket];
1104 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:213
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:203
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 908 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

910  {
911  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
912  if (client_) {
913  return client_->get_regexp_like(pattern, escape, generation);
914  }
915  const auto cache_key = std::make_pair(pattern, escape);
916  const auto it = regex_cache_.find(cache_key);
917  if (it != regex_cache_.end()) {
918  return it->second;
919  }
920  std::vector<int32_t> result;
921  std::vector<std::thread> workers;
922  int worker_count = cpu_threads();
923  CHECK_GT(worker_count, 0);
924  std::vector<std::vector<int32_t>> worker_results(worker_count);
925  CHECK_LE(generation, str_count_);
926  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
927  workers.emplace_back([&worker_results,
928  &pattern,
929  generation,
930  escape,
931  worker_idx,
932  worker_count,
933  this]() {
934  for (size_t string_id = worker_idx; string_id < generation;
935  string_id += worker_count) {
936  const auto str = getStringUnlocked(string_id);
937  if (is_regexp_like(str, pattern, escape)) {
938  worker_results[worker_idx].push_back(string_id);
939  }
940  }
941  });
942  }
943  for (auto& worker : workers) {
944  worker.join();
945  }
946  for (const auto& worker_result : worker_results) {
947  result.insert(result.end(), worker_result.begin(), worker_result.end());
948  }
949  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
950  CHECK(it_ok.second);
951 
952  return result;
953 }
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:214
#define CHECK(condition)
Definition: Logger.h:203
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 576 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

576  {
577  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
578  if (client_) {
579  std::string ret;
580  client_->get_string(ret, string_id);
581  return ret;
582  }
583  return getStringUnlocked(string_id);
584 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 591 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

592  {
593  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
594  CHECK(!client_);
595  CHECK_LE(0, string_id);
596  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
597  return getStringBytesChecked(string_id);
598 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:213
#define CHECK_LE(x, y)
Definition: Logger.h:214
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:203
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1112 of file StringDictionary.cpp.

References CHECK.

1113  {
1114  const auto str_canary = getStringFromStorage(string_id);
1115  CHECK(!str_canary.canary);
1116  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1117 }
#define CHECK(condition)
Definition: Logger.h:203
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1106 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1106  {
1107  const auto str_canary = getStringFromStorage(string_id);
1108  CHECK(!str_canary.canary);
1109  return std::string(str_canary.c_str_ptr, str_canary.size);
1110 }
#define CHECK(condition)
Definition: Logger.h:203
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1298 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1299  {
1300  if (!isTemp_) {
1301  CHECK_GE(payload_fd_, 0);
1302  CHECK_GE(offset_fd_, 0);
1303  }
1304  CHECK_GE(string_id, 0);
1305  const StringIdxEntry* str_meta = offset_map_ + string_id;
1306  if (str_meta->size == 0xffff) {
1307  // hit the canary
1308  return {nullptr, 0, true};
1309  }
1310  return {payload_map_ + str_meta->off, str_meta->size, false};
1311 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:216

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1292 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1293  {
1294  const StringIdxEntry* str_meta = offset_map_ + string_id;
1295  return {payload_map_ + str_meta->off, str_meta->size};
1296 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 586 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

586  {
587  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
588  return getStringChecked(string_id);
589 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:213

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 569 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

569  {
570  const string_dict_hash_t hash = hash_string(str);
573  return str_id;
574 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 342 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and test_fsi::r.

Referenced by getOrAddBulkParallel().

344  {
345  CHECK_EQ(string_vec.size(), hashes.size());
346 
347  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
348  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
349  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
350  if (string_vec[curr_id].empty()) {
351  continue;
352  }
353  hashes[curr_id] = hash_string(string_vec[curr_id]);
354  }
355  });
356 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
string_dict_hash_t hash_string(const std::string_view &str)
tuple r
Definition: test_fsi.py:16

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1011 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), i, INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1011  {
1012  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1013  INVALID_STR_ID);
1014 
1015  if (materialize_hashes_) {
1016  for (size_t i = 0; i != str_count_; ++i) {
1017  const string_dict_hash_t hash = hash_cache_[i];
1018  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1019  new_str_ids[bucket] = i;
1020  }
1021  hash_cache_.resize(hash_cache_.size() * 2);
1022  } else {
1023  for (size_t i = 0; i != str_count_; ++i) {
1024  const auto str = getStringChecked(i);
1025  const string_dict_hash_t hash = hash_string(str);
1026  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1027  new_str_ids[bucket] = i;
1028  }
1029  }
1030  string_id_string_dict_hash_table_.swap(new_str_ids);
1031 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1034 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string(), and i.

Referenced by getOrAddBulkParallel().

1040  {
1041  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1042  INVALID_STR_ID);
1043  if (materialize_hashes_) {
1044  for (size_t i = 0; i != str_count; ++i) {
1045  const string_dict_hash_t hash = hash_cache_[i];
1046  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1047  new_str_ids[bucket] = i;
1048  }
1049  hash_cache_.resize(hash_cache_.size() * 2);
1050  } else {
1051  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1052  const auto storage_string = getStringChecked(storage_idx);
1053  const string_dict_hash_t hash = hash_string(storage_string);
1054  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1055  new_str_ids[bucket] = storage_idx;
1056  }
1057  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1058  const size_t string_memory_id = string_memory_ids[memory_idx];
1059  const uint32_t bucket = computeUniqueBucketWithHash(
1060  input_strings_hashes[string_memory_id], new_str_ids);
1061  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1062  }
1063  }
1064  string_id_string_dict_hash_table_.swap(new_str_ids);
1065 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1373 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, regex_cache_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1373  {
1374  if (!like_cache_.empty()) {
1375  decltype(like_cache_)().swap(like_cache_);
1376  }
1377  if (!regex_cache_.empty()) {
1378  decltype(regex_cache_)().swap(regex_cache_);
1379  }
1380  if (!equal_cache_.empty()) {
1381  decltype(equal_cache_)().swap(equal_cache_);
1382  }
1383  compare_cache_.invalidateInvertedIndex();
1384 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1433 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1433  {
1434  // this method is not thread safe
1435  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1436  size_t t_idx = 0, s_idx = 0, idx = 0;
1437  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1438  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1439  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1440  const auto insert_from_temp_cache =
1441  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1442  if (insert_from_temp_cache) {
1443  updated_cache[idx] = temp_sorted_cache[t_idx++];
1444  } else {
1445  updated_cache[idx] = sorted_cache[s_idx++];
1446  }
1447  }
1448  while (t_idx < temp_sorted_cache.size()) {
1449  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1450  }
1451  while (s_idx < sorted_cache.size()) {
1452  updated_cache[idx++] = sorted_cache[s_idx++];
1453  }
1454  sorted_cache.swap(updated_cache);
1455 }
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1484 of file StringDictionary.cpp.

References i, populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1488  {
1489  dest_array_ids.resize(source_array_ids.size());
1490 
1491  std::atomic<size_t> row_idx{0};
1492  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1493  int thread_id) {
1494  for (;;) {
1495  auto row = row_idx.fetch_add(1);
1496 
1497  if (row >= dest_array_ids.size()) {
1498  return;
1499  }
1500  const auto& source_ids = source_array_ids[row];
1501  auto& dest_ids = dest_array_ids[row];
1502  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1503  }
1504  };
1505 
1506  const int num_worker_threads = std::thread::hardware_concurrency();
1507 
1508  if (source_array_ids.size() / num_worker_threads > 10) {
1509  std::vector<std::future<void>> worker_threads;
1510  for (int i = 0; i < num_worker_threads; ++i) {
1511  worker_threads.push_back(std::async(std::launch::async, processor, i));
1512  }
1513 
1514  for (auto& child : worker_threads) {
1515  child.wait();
1516  }
1517  for (auto& child : worker_threads) {
1518  child.get();
1519  }
1520  } else {
1521  processor(0);
1522  }
1523 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:732

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1457 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1462  {
1463  std::vector<std::string> strings;
1464 
1465  for (const int32_t source_id : source_ids) {
1466  if (source_id == std::numeric_limits<int32_t>::min()) {
1467  strings.emplace_back("");
1468  } else if (source_id < 0) {
1469  if (auto string_itr = transient_mapping.find(source_id);
1470  string_itr != transient_mapping.end()) {
1471  strings.emplace_back(string_itr->second);
1472  } else {
1473  throw std::runtime_error("Unexpected negative source ID");
1474  }
1475  } else {
1476  strings.push_back(source_dict->getString(source_id));
1477  }
1478  }
1479 
1480  dest_ids.resize(strings.size());
1481  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1482 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 214 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

216  {
217  for (auto& dictionary_future : dictionary_futures) {
218  dictionary_future.wait();
219  const auto hashVec = dictionary_future.get();
220  for (const auto& hash : hashVec) {
221  const uint32_t bucket =
223  payload_file_off_ += hash.second;
224  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
225  if (materialize_hashes_) {
226  hash_cache_[str_count_] = hash.first;
227  }
228  ++str_count_;
229  }
230  }
231  dictionary_futures.clear();
232 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1420 of file StringDictionary.cpp.

References getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1420  {
1421  // This method is not thread-safe.
1422 
1423  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1424  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1425 
1426  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1427  auto a_str = this->getStringFromStorage(a);
1428  auto b_str = this->getStringFromStorage(b);
1429  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1430  });
1431 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 600 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

600  {
601  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
602  if (client_) {
603  return client_->storage_entry_count();
604  }
605  return str_count_;
606 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 232 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 233 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 230 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

size_t StringDictionary::collisions_
private

Definition at line 208 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 227 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 226 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

const std::string StringDictionary::folder_
private

Definition at line 206 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

std::vector<string_dict_hash_t> StringDictionary::hash_cache_
private
std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 224 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 117 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 214 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 225 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 211 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 228 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: