OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseHashTableCapacity () noexcept
 
template<class String >
void increaseHashTableCapacityFromStorageAndMemory (const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
 
int32_t getOrAddImpl (const std::string_view &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (const String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

const std::string folder_
 
size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_string_dict_hash_table_
 
std::vector< string_dict_hash_thash_cache_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 44 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 96 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), threading_serial::async(), CHECK_EQ, omnisci::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, omnisci::file_size(), getNumStringsFromStorage(), getStringFromStorage(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_string_dict_hash_table_, VLOG, and logger::WARNING.

101  : folder_(folder)
102  , str_count_(0)
104  , hash_cache_(initial_capacity)
105  , isTemp_(isTemp)
106  , materialize_hashes_(materializeHashes)
107  , payload_fd_(-1)
108  , offset_fd_(-1)
109  , offset_map_(nullptr)
110  , payload_map_(nullptr)
111  , offset_file_size_(0)
112  , payload_file_size_(0)
113  , payload_file_off_(0)
114  , strings_cache_(nullptr) {
115  if (!isTemp && folder.empty()) {
116  return;
117  }
118 
119  // initial capacity must be a power of two for efficient bucket computation
120  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
121  if (!isTemp_) {
122  boost::filesystem::path storage_path(folder);
123  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
124  const auto payload_path =
125  (storage_path / boost::filesystem::path("DictPayload")).string();
126  payload_fd_ = checked_open(payload_path.c_str(), recover);
127  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
130  }
131  bool storage_is_empty = false;
132  if (payload_file_size_ == 0) {
133  storage_is_empty = true;
135  }
136  if (offset_file_size_ == 0) {
138  }
139  if (!isTemp_) { // we never mmap or recover temp dictionaries
140  payload_map_ =
141  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
142  offset_map_ = reinterpret_cast<StringIdxEntry*>(
144  if (recover) {
145  const size_t bytes = omnisci::file_size(offset_fd_);
146  if (bytes % sizeof(StringIdxEntry) != 0) {
147  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
148  }
149  const uint64_t str_count =
150  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
151  collisions_ = 0;
152  // at this point we know the size of the StringDict we need to load
153  // so lets reallocate the vector to the correct size
154  const uint64_t max_entries =
155  std::max(round_up_p2(str_count * 2 + 1),
156  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
157  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
158  string_id_string_dict_hash_table_.swap(new_str_ids);
159  if (materialize_hashes_) {
160  std::vector<string_dict_hash_t> new_hash_cache(max_entries / 2);
161  hash_cache_.swap(new_hash_cache);
162  }
163  // Bail early if we know we don't have strings to add (i.e. a new or empty
164  // dictionary)
165  if (str_count == 0) {
166  return;
167  }
168 
169  unsigned string_id = 0;
170  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
171 
172  uint32_t thread_inits = 0;
173  const auto thread_count = std::thread::hardware_concurrency();
174  const uint32_t items_per_thread = std::max<uint32_t>(
175  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
176  std::vector<std::future<std::vector<std::pair<string_dict_hash_t, unsigned int>>>>
177  dictionary_futures;
178  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
179  dictionary_futures.emplace_back(std::async(
180  std::launch::async, [string_id, str_count, items_per_thread, this] {
181  std::vector<std::pair<string_dict_hash_t, unsigned int>> hashVec;
182  for (uint32_t curr_id = string_id;
183  curr_id < string_id + items_per_thread && curr_id < str_count;
184  curr_id++) {
185  const auto recovered = getStringFromStorage(curr_id);
186  if (recovered.canary) {
187  // hit the canary, recovery finished
188  break;
189  } else {
190  std::string_view temp(recovered.c_str_ptr, recovered.size);
191  hashVec.emplace_back(std::make_pair(hash_string(temp), temp.size()));
192  }
193  }
194  return hashVec;
195  }));
196  thread_inits++;
197  if (thread_inits % thread_count == 0) {
198  processDictionaryFutures(dictionary_futures);
199  }
200  }
201  // gather last few threads
202  if (dictionary_futures.size() != 0) {
203  processDictionaryFutures(dictionary_futures);
204  }
205  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
206  << " Hash table size: " << string_id_string_dict_hash_table_.size()
207  << " Fill rate: "
208  << static_cast<double>(str_count_) * 100.0 /
210  << "% Collisions: " << collisions_;
211  }
212  }
213 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:217
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
string_dict_hash_t hash_string(const std::string_view &str)
#define LOG(tag)
Definition: Logger.h:203
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::vector< string_dict_hash_t > hash_cache_
std::string offsets_path_
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &dictionary_futures)
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > string_id_string_dict_hash_table_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
#define VLOG(n)
Definition: Logger.h:303
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 265 of file StringDictionary.cpp.

266  : folder_("DB_" + std::to_string(dict_ref.dbId) + "_DICT_" +
267  std::to_string(dict_ref.dictId))
268  , strings_cache_(nullptr)
269  , client_(new StringDictionaryClient(host, dict_ref, true))
270  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::string to_string(char const *&&v)
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
const std::string folder_
int32_t dictId
Definition: DictRef.h:10
std::unique_ptr< StringDictionaryClient > client_no_timeout_
int32_t dbId
Definition: DictRef.h:9
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 272 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, omnisci::checked_munmap(), client_, omnisci::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

272  {
273  free(CANARY_BUFFER);
274  if (client_) {
275  return;
276  }
277  if (payload_map_) {
278  if (!isTemp_) {
282  CHECK_GE(payload_fd_, 0);
284  CHECK_GE(offset_fd_, 0);
286  } else {
288  free(payload_map_);
289  free(offset_map_);
290  }
291  }
292 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:222
std::unique_ptr< StringDictionaryClient > client_
void close(const int fd)
Definition: omnisci_fs.cpp:68
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1357 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1359  {
1360  const size_t canary_buff_size_to_add =
1361  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1362  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1363  if (canary_buffer_size < canary_buff_size_to_add) {
1364  CANARY_BUFFER =
1365  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1366  canary_buffer_size = canary_buff_size_to_add;
1368  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1369  }
1370  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1371  CHECK(new_addr);
1372  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1373  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1374  mem_size += canary_buff_size_to_add;
1375  return new_addr;
1376 }
#define CHECK(condition)
Definition: Logger.h:209
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1327 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1327  {
1328  if (!isTemp_) {
1329  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1330  } else {
1331  offset_map_ = static_cast<StringIdxEntry*>(
1332  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1333  }
1334 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1318 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1318  {
1319  if (!isTemp_) {
1320  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1321  } else {
1322  payload_map_ = static_cast<char*>(
1323  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1324  }
1325 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1336 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1338  {
1339  const size_t canary_buff_size_to_add =
1340  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1341  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1342 
1343  if (canary_buffer_size < canary_buff_size_to_add) {
1344  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1345  canary_buffer_size = canary_buff_size_to_add;
1347  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1348  }
1349 
1350  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1351  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1352  CHECK(write_return > 0 &&
1353  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1354  return canary_buff_size_to_add;
1355 }
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
#define CHECK_NE(x, y)
Definition: Logger.h:218
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( const String  str)
privatenoexcept

Definition at line 1263 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1263  {
1264  // write the payload
1266  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1267 
1268  // write the offset and length
1269  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1270  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1271 
1273  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1274 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1277 of file StringDictionary.cpp.

References i.

Referenced by getOrAddBulkParallel().

1280  {
1281  const size_t num_strings = string_memory_ids.size();
1282 
1283  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1284  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1285 
1286  for (size_t i = 0; i < num_strings; ++i) {
1287  const size_t string_idx = string_memory_ids[i];
1288  const String str = input_strings[string_idx];
1289  const size_t str_size(str.size());
1290  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1291  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1292  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1293  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1294  }
1295 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1414 of file StringDictionary.cpp.

References i, mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1414  {
1415  // This method is not thread-safe.
1416  const auto cur_cache_size = sorted_cache.size();
1417  std::vector<int32_t> temp_sorted_cache;
1418  for (size_t i = cur_cache_size; i < str_count_; i++) {
1419  temp_sorted_cache.push_back(i);
1420  }
1421  sortCache(temp_sorted_cache);
1422  mergeSortedCache(temp_sorted_cache);
1423 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1242 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1243  {
1244  const size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1245  if (offset_file_off + write_length >= offset_file_size_) {
1246  const size_t min_capacity_needed =
1247  write_length - (offset_file_size_ - offset_file_off);
1248  if (!isTemp_) {
1249  CHECK_GE(offset_fd_, 0);
1251  addOffsetCapacity(min_capacity_needed);
1252  CHECK(offset_file_off + write_length <= offset_file_size_);
1253  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1255  } else {
1256  addOffsetCapacity(min_capacity_needed);
1257  CHECK(offset_file_off + write_length <= offset_file_size_);
1258  }
1259  }
1260 }
StringIdxEntry * offset_map_
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:222
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1223 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1224  {
1225  if (payload_file_off_ + write_length > payload_file_size_) {
1226  const size_t min_capacity_needed =
1227  write_length - (payload_file_size_ - payload_file_off_);
1228  if (!isTemp_) {
1229  CHECK_GE(payload_fd_, 0);
1231  addPayloadCapacity(min_capacity_needed);
1232  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1233  payload_map_ =
1234  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1235  } else {
1236  addPayloadCapacity(min_capacity_needed);
1237  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1238  }
1239  }
1240 }
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
#define CHECK_GE(x, y)
Definition: Logger.h:222
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1395 of file StringDictionary.cpp.

References CHECK, client_, omnisci::fsync(), isTemp_, omnisci::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1395  {
1396  if (client_) {
1397  try {
1398  return client_->checkpoint();
1399  } catch (...) {
1400  return false;
1401  }
1402  }
1403  CHECK(!isTemp_);
1404  bool ret = true;
1405  ret = ret &&
1406  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1407  ret = ret &&
1408  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1409  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1410  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1411  return ret;
1412 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const string_dict_hash_t  hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
) const
privatenoexcept

Definition at line 1125 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1128  {
1129  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1130  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1131  while (true) {
1132  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1133  if (candidate_string_id ==
1134  INVALID_STR_ID) { // In this case it means the slot is available for use
1135  break;
1136  }
1137  if ((materialize_hashes_ && hash == hash_cache_[candidate_string_id]) ||
1139  const auto candidate_string = getStringFromStorageFast(candidate_string_id);
1140  if (input_string.size() == candidate_string.size() &&
1141  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1142  // found the string
1143  break;
1144  }
1145  }
1146  // wrap around
1147  if (++bucket == string_dict_hash_table_size) {
1148  bucket = 0;
1149  }
1150  }
1151  return bucket;
1152 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const string_dict_hash_t  input_string_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_string_dict_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1155 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1161  {
1162  uint32_t bucket = input_string_hash & (string_id_string_dict_hash_table.size() - 1);
1163  while (true) {
1164  const int32_t candidate_string_id = string_id_string_dict_hash_table[bucket];
1165  if (candidate_string_id ==
1166  INVALID_STR_ID) { // In this case it means the slot is available for use
1167  break;
1168  }
1169  if (!materialize_hashes_ || (input_string_hash == hash_cache_[candidate_string_id])) {
1170  if (candidate_string_id > 0 &&
1171  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1172  // The candidate string is not in storage yet but in our string_memory_ids temp
1173  // buffer
1174  size_t memory_offset =
1175  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1176  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1177  if (input_string.size() == candidate_string.size() &&
1178  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1179  // found the string in the temp memory buffer
1180  break;
1181  }
1182  } else {
1183  // The candidate string is in storage, need to fetch it for comparison
1184  const auto candidate_storage_string =
1185  getStringFromStorageFast(candidate_string_id);
1186  if (input_string.size() == candidate_storage_string.size() &&
1187  !memcmp(input_string.data(),
1188  candidate_storage_string.data(),
1189  input_string.size())) {
1192  // found the string in storage
1193  break;
1194  }
1195  }
1196  }
1197  if (++bucket == string_id_string_dict_hash_table.size()) {
1198  bucket = 0;
1199  }
1200  }
1201  return bucket;
1202 }
std::vector< string_dict_hash_t > hash_cache_
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const string_dict_hash_t  hash,
const std::vector< int32_t > &  string_id_string_dict_hash_table 
)
privatenoexcept

Definition at line 1204 of file StringDictionary.cpp.

Referenced by increaseHashTableCapacity(), and processDictionaryFutures().

1206  {
1207  const size_t string_dict_hash_table_size = string_id_string_dict_hash_table.size();
1208  uint32_t bucket = hash & (string_dict_hash_table_size - 1);
1209  while (true) {
1210  if (string_id_string_dict_hash_table[bucket] ==
1211  INVALID_STR_ID) { // In this case it means the slot is available for use
1212  break;
1213  }
1214  collisions_++;
1215  // wrap around
1216  if (++bucket == string_dict_hash_table_size) {
1217  bucket = 0;
1218  }
1219  }
1220  return bucket;
1221 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 960 of file StringDictionary.cpp.

References threading_serial::async(), CHECK_EQ, CHECK_GT, CHECK_LE, client_, gpu_enabled::copy(), cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

960  {
961  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
962  if (client_) {
963  // TODO(miyu): support remote string dictionary
964  throw std::runtime_error(
965  "copying dictionaries from remote server is not supported yet.");
966  }
967 
968  if (strings_cache_) {
969  return strings_cache_;
970  }
971 
972  strings_cache_ = std::make_shared<std::vector<std::string>>();
973  strings_cache_->reserve(str_count_);
974  const bool multithreaded = str_count_ > 10000;
975  const auto worker_count =
976  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
977  CHECK_GT(worker_count, 0UL);
978  std::vector<std::vector<std::string>> worker_results(worker_count);
979  auto copy = [this](std::vector<std::string>& str_list,
980  const size_t start_id,
981  const size_t end_id) {
982  CHECK_LE(start_id, end_id);
983  str_list.reserve(end_id - start_id);
984  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
985  str_list.push_back(getStringUnlocked(string_id));
986  }
987  };
988  if (multithreaded) {
989  std::vector<std::future<void>> workers;
990  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
991  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
992  worker_idx < worker_count && start < str_count_;
993  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
994  workers.push_back(std::async(
995  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
996  }
997  for (auto& worker : workers) {
998  worker.get();
999  }
1000  } else {
1001  CHECK_EQ(worker_results.size(), size_t(1));
1002  copy(worker_results[0], 0, str_count_);
1003  }
1004 
1005  for (const auto& worker_result : worker_results) {
1006  strings_cache_->insert(
1007  strings_cache_->end(), worker_result.begin(), worker_result.end());
1008  }
1009  return strings_cache_;
1010 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
future< Result > async(Fn &&fn, Args &&...args)
std::shared_ptr< std::vector< std::string > > strings_cache_
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:220
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 1012 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1012  {
1013  return string_id_string_dict_hash_table_.size() <= num_strings * 2;
1014 }
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 752 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), i, gpu_enabled::lower_bound(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

754  {
755  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
756  if (client_) {
757  return client_->get_compare(pattern, comp_operator, generation);
758  }
759  std::vector<int32_t> ret;
760  if (str_count_ == 0) {
761  return ret;
762  }
763  if (sorted_cache.size() < str_count_) {
764  if (comp_operator == "=" || comp_operator == "<>") {
765  return getEquals(pattern, comp_operator, generation);
766  }
767 
769  }
770  auto cache_index = compare_cache_.get(pattern);
771 
772  if (!cache_index) {
773  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
774  const auto cache_itr = std::lower_bound(
775  sorted_cache.begin(),
776  sorted_cache.end(),
777  pattern,
778  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
779  auto a_str = this->getStringFromStorage(a);
780  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
781  });
782 
783  if (cache_itr == sorted_cache.end()) {
784  cache_index->index = sorted_cache.size() - 1;
785  cache_index->diff = 1;
786  } else {
787  const auto cache_str = getStringFromStorage(*cache_itr);
788  if (!string_eq(
789  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
790  cache_index->index = cache_itr - sorted_cache.begin() - 1;
791  cache_index->diff = 1;
792  } else {
793  cache_index->index = cache_itr - sorted_cache.begin();
794  cache_index->diff = 0;
795  }
796  }
797 
798  compare_cache_.put(pattern, cache_index);
799  }
800 
801  // since we have a cache in form of vector of ints which is sorted according to
802  // corresponding strings in the dictionary all we need is the index of the element
803  // which equal to the pattern that we are trying to match or the index of “biggest”
804  // element smaller than the pattern, to perform all the comparison operators over
805  // string. The search function guarantees we have such index so now it is just the
806  // matter to include all the elements in the result vector.
807 
808  // For < operator if the index that we have points to the element which is equal to
809  // the pattern that we are searching for we simply get all the elements less than the
810  // index. If the element pointed by the index is not equal to the pattern we are
811  // comparing with we also need to include that index in result vector, except when the
812  // index points to 0 and the pattern is lesser than the smallest value in the string
813  // dictionary.
814 
815  if (comp_operator == "<") {
816  size_t idx = cache_index->index;
817  if (cache_index->diff) {
818  idx = cache_index->index + 1;
819  if (cache_index->index == 0 && cache_index->diff > 0) {
820  idx = cache_index->index;
821  }
822  }
823  for (size_t i = 0; i < idx; i++) {
824  ret.push_back(sorted_cache[i]);
825  }
826 
827  // For <= operator if the index that we have points to the element which is equal to
828  // the pattern that we are searching for we want to include the element pointed by
829  // the index in the result set. If the element pointed by the index is not equal to
830  // the pattern we are comparing with we just want to include all the ids with index
831  // less than the index that is cached, except when pattern that we are searching for
832  // is smaller than the smallest string in the dictionary.
833 
834  } else if (comp_operator == "<=") {
835  size_t idx = cache_index->index + 1;
836  if (cache_index == 0 && cache_index->diff > 0) {
837  idx = cache_index->index;
838  }
839  for (size_t i = 0; i < idx; i++) {
840  ret.push_back(sorted_cache[i]);
841  }
842 
843  // For > operator we want to get all the elements with index greater than the index
844  // that we have except, when the pattern we are searching for is lesser than the
845  // smallest string in the dictionary we also want to include the id of the index
846  // that we have.
847 
848  } else if (comp_operator == ">") {
849  size_t idx = cache_index->index + 1;
850  if (cache_index->index == 0 && cache_index->diff > 0) {
851  idx = cache_index->index;
852  }
853  for (size_t i = idx; i < sorted_cache.size(); i++) {
854  ret.push_back(sorted_cache[i]);
855  }
856 
857  // For >= operator when the indexed element that we have points to element which is
858  // equal to the pattern we are searching for we want to include that in the result
859  // vector. If the index that we have does not point to the string which is equal to
860  // the pattern we are searching we don’t want to include that id into the result
861  // vector except when the index is 0.
862 
863  } else if (comp_operator == ">=") {
864  size_t idx = cache_index->index;
865  if (cache_index->diff) {
866  idx = cache_index->index + 1;
867  if (cache_index->index == 0 && cache_index->diff > 0) {
868  idx = cache_index->index;
869  }
870  }
871  for (size_t i = idx; i < sorted_cache.size(); i++) {
872  ret.push_back(sorted_cache[i]);
873  }
874  } else if (comp_operator == "=") {
875  if (!cache_index->diff) {
876  ret.push_back(sorted_cache[cache_index->index]);
877  }
878 
879  // For <> operator it is simple matter of not including id of string which is equal
880  // to pattern we are searching for.
881  } else if (comp_operator == "<>") {
882  if (!cache_index->diff) {
883  size_t idx = cache_index->index;
884  for (size_t i = 0; i < idx; i++) {
885  ret.push_back(sorted_cache[i]);
886  }
887  ++idx;
888  for (size_t i = idx; i < sorted_cache.size(); i++) {
889  ret.push_back(sorted_cache[i]);
890  }
891  } else {
892  for (size_t i = 0; i < sorted_cache.size(); i++) {
893  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
894  }
895  }
896 
897  } else {
898  std::runtime_error("Unsupported string comparison operator");
899  }
900  return ret;
901 }
RUNTIME_EXPORT DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
#define const
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::unique_ptr< StringDictionaryClient > client_
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
constexpr double a
Definition: Utm.h:38
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
mapd_shared_mutex rw_mutex_
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 692 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

694  {
695  std::vector<int32_t> result;
696  auto eq_id_itr = equal_cache_.find(pattern);
697  int32_t eq_id = MAX_STRLEN + 1;
698  int32_t cur_size = str_count_;
699  if (eq_id_itr != equal_cache_.end()) {
700  auto eq_id = eq_id_itr->second;
701  if (comp_operator == "=") {
702  result.push_back(eq_id);
703  } else {
704  for (int32_t idx = 0; idx <= cur_size; idx++) {
705  if (idx == eq_id) {
706  continue;
707  }
708  result.push_back(idx);
709  }
710  }
711  } else {
712  std::vector<std::thread> workers;
713  int worker_count = cpu_threads();
714  CHECK_GT(worker_count, 0);
715  std::vector<std::vector<int32_t>> worker_results(worker_count);
716  CHECK_LE(generation, str_count_);
717  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
718  workers.emplace_back(
719  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
720  for (size_t string_id = worker_idx; string_id < generation;
721  string_id += worker_count) {
722  const auto str = getStringUnlocked(string_id);
723  if (str == pattern) {
724  worker_results[worker_idx].push_back(string_id);
725  }
726  }
727  });
728  }
729  for (auto& worker : workers) {
730  worker.join();
731  }
732  for (const auto& worker_result : worker_results) {
733  result.insert(result.end(), worker_result.begin(), worker_result.end());
734  }
735  if (result.size() > 0) {
736  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
737  CHECK(it_ok.second);
738  eq_id = result[0];
739  }
740  if (comp_operator == "<>") {
741  for (int32_t idx = 0; idx <= cur_size; idx++) {
742  if (idx == eq_id) {
743  continue;
744  }
745  result.push_back(idx);
746  }
747  }
748  }
749  return result;
750 }
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:220
#define CHECK(condition)
Definition: Logger.h:209
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 566 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

566  {
567  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
568  if (client_) {
569  return client_->get(str);
570  }
571  return getUnlocked(str);
572 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 639 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

643  {
644  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
645  if (client_) {
646  return client_->get_like(pattern, icase, is_simple, escape, generation);
647  }
648  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
649  const auto it = like_cache_.find(cache_key);
650  if (it != like_cache_.end()) {
651  return it->second;
652  }
653  std::vector<int32_t> result;
654  std::vector<std::thread> workers;
655  int worker_count = cpu_threads();
656  CHECK_GT(worker_count, 0);
657  std::vector<std::vector<int32_t>> worker_results(worker_count);
658  CHECK_LE(generation, str_count_);
659  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
660  workers.emplace_back([&worker_results,
661  &pattern,
662  generation,
663  icase,
664  is_simple,
665  escape,
666  worker_idx,
667  worker_count,
668  this]() {
669  for (size_t string_id = worker_idx; string_id < generation;
670  string_id += worker_count) {
671  const auto str = getStringUnlocked(string_id);
672  if (is_like(str, pattern, icase, is_simple, escape)) {
673  worker_results[worker_idx].push_back(string_id);
674  }
675  }
676  });
677  }
678  for (auto& worker : workers) {
679  worker.join();
680  }
681  for (const auto& worker_result : worker_results) {
682  result.insert(result.end(), worker_result.begin(), worker_result.end());
683  }
684  // place result into cache for reuse if similar query
685  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
686 
687  CHECK(it_ok.second);
688 
689  return result;
690 }
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:220
#define CHECK(condition)
Definition: Logger.h:209
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 242 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

243  {
244  if (storage_slots == 0) {
245  return 0;
246  }
247  // Must use signed integers since final binary search step can wrap to max size_t value
248  // if dictionary is empty
249  int64_t min_bound = 0;
250  int64_t max_bound = storage_slots - 1;
251  int64_t guess{0};
252  while (min_bound <= max_bound) {
253  guess = (max_bound + min_bound) / 2;
254  CHECK_GE(guess, 0);
255  if (getStringFromStorage(guess).canary) {
256  max_bound = guess - 1;
257  } else {
258  min_bound = guess + 1;
259  }
260  }
261  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
262  return guess + (min_bound > guess ? 1 : 0);
263 }
#define CHECK_GE(x, y)
Definition: Logger.h:222
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 294 of file StringDictionary.cpp.

References CHECK_EQ.

294  {
295  if (client_) {
296  std::vector<int32_t> string_ids;
297  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
298  CHECK_EQ(size_t(1), string_ids.size());
299  return string_ids.front();
300  }
301  return getOrAddImpl(str);
302 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::unique_ptr< StringDictionaryClient > client_
int32_t getOrAddImpl(const std::string_view &str) noexcept
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 364 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), folder_, g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), increaseHashTableCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

365  {
367  getOrAddBulkParallel(input_strings, output_string_ids);
368  return;
369  }
370  // Single-thread path.
371  if (client_no_timeout_) {
372  getOrAddBulkRemote(input_strings, output_string_ids);
373  return;
374  }
375  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
376 
377  const size_t initial_str_count = str_count_;
378  size_t idx = 0;
379  for (const auto& input_string : input_strings) {
380  if (input_string.empty()) {
381  output_string_ids[idx++] = inline_int_null_value<T>();
382  continue;
383  }
384  CHECK(input_string.size() <= MAX_STRLEN);
385 
386  const string_dict_hash_t input_string_hash = hash_string(input_string);
387  uint32_t hash_bucket =
388  computeBucket(input_string_hash, input_string, string_id_string_dict_hash_table_);
390  output_string_ids[idx++] = string_id_string_dict_hash_table_[hash_bucket];
391  continue;
392  }
393  // need to add record to dictionary
394  // check there is room
395  if (str_count_ > static_cast<size_t>(max_valid_int_value<T>())) {
396  throw_encoding_error<T>(input_string, folder_);
397  }
399  << "Maximum number (" << str_count_
400  << ") of Dictionary encoded Strings reached for this column, offset path "
401  "for column is "
402  << offsets_path_;
403  if (fillRateIsHigh(str_count_)) {
404  // resize when more than 50% is full
406  hash_bucket = computeBucket(
407  input_string_hash, input_string, string_id_string_dict_hash_table_);
408  }
409  appendToStorage(input_string);
410 
411  if (materialize_hashes_) {
412  hash_cache_[str_count_] = input_string_hash;
413  }
414  const int32_t string_id = static_cast<int32_t>(str_count_);
415  string_id_string_dict_hash_table_[hash_bucket] = string_id;
416  output_string_ids[idx++] = string_id;
417  ++str_count_;
418  }
419  const size_t num_strings_added = str_count_ - initial_str_count;
420  if (num_strings_added > 0) {
422  }
423 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
const std::string folder_
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:219
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:209
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 321 of file StringDictionary.cpp.

References getOrAddBulk(), and i.

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

323  {
324  ids_array_vec.resize(string_array_vec.size());
325  for (size_t i = 0; i < string_array_vec.size(); i++) {
326  auto& strings = string_array_vec[i];
327  auto& ids = ids_array_vec[i];
328  ids.resize(strings.size());
329  getOrAddBulk(strings, &ids[0]);
330  }
331 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 426 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), folder_, getOrAddBulkRemote(), hash_cache_, hashStrings(), increaseHashTableCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rw_mutex_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

427  {
428  if (client_no_timeout_) {
429  getOrAddBulkRemote(input_strings, output_string_ids);
430  return;
431  }
432  // Compute hashes of the input strings up front, and in parallel,
433  // as the string hashing does not need to be behind the subsequent write_lock
434  std::vector<string_dict_hash_t> input_strings_hashes(input_strings.size());
435  hashStrings(input_strings, input_strings_hashes);
436 
437  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
438  size_t shadow_str_count =
439  str_count_; // Need to shadow str_count_ now with bulk add methods
440  const size_t storage_high_water_mark = shadow_str_count;
441  std::vector<size_t> string_memory_ids;
442  size_t sum_new_string_lengths = 0;
443  string_memory_ids.reserve(input_strings.size());
444  size_t input_string_idx{0};
445  for (const auto& input_string : input_strings) {
446  // Currently we make empty strings null
447  if (input_string.empty()) {
448  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
449  continue;
450  }
451  // TODO: Recover gracefully if an input string is too long
452  CHECK(input_string.size() <= MAX_STRLEN);
453 
454  if (fillRateIsHigh(shadow_str_count)) {
455  // resize when more than 50% is full
457  storage_high_water_mark,
458  input_strings,
459  string_memory_ids,
460  input_strings_hashes);
461  }
462  // Compute the hash for this input_string
463  const string_dict_hash_t input_string_hash = input_strings_hashes[input_string_idx];
464 
465  const uint32_t hash_bucket =
466  computeBucketFromStorageAndMemory(input_string_hash,
467  input_string,
469  storage_high_water_mark,
470  input_strings,
471  string_memory_ids);
472 
473  // If the hash bucket is not empty, that is our string id
474  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
475  // bucket string are equal)
477  output_string_ids[input_string_idx++] =
479  continue;
480  }
481  // Did not find string, so need to add record to dictionary
482  // First check there is room
483  if (shadow_str_count > static_cast<size_t>(max_valid_int_value<T>())) {
484  throw_encoding_error<T>(input_string, folder_);
485  }
486  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
487  << "Maximum number (" << shadow_str_count
488  << ") of Dictionary encoded Strings reached for this column, offset path "
489  "for column is "
490  << offsets_path_;
491 
492  string_memory_ids.push_back(input_string_idx);
493  sum_new_string_lengths += input_string.size();
494  string_id_string_dict_hash_table_[hash_bucket] =
495  static_cast<int32_t>(shadow_str_count);
496  if (materialize_hashes_) {
497  hash_cache_[shadow_str_count] = input_string_hash;
498  }
499  output_string_ids[input_string_idx++] = shadow_str_count++;
500  }
501  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
502  const size_t num_strings_added = shadow_str_count - str_count_;
503  str_count_ = shadow_str_count;
504  if (num_strings_added > 0) {
506  }
507 }
uint32_t computeBucketFromStorageAndMemory(const string_dict_hash_t input_string_hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
const std::string folder_
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:219
void hashStrings(const std::vector< String > &string_vec, std::vector< string_dict_hash_t > &hashes) const noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void increaseHashTableCapacityFromStorageAndMemory(const size_t str_count, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< string_dict_hash_t > &input_strings_hashes) noexcept
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:209
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 526 of file StringDictionary.cpp.

References CHECK, client_no_timeout_, folder_, and i.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

527  {
529  std::vector<int32_t> string_ids;
530  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
531  size_t out_idx{0};
532  for (size_t i = 0; i < string_ids.size(); ++i) {
533  const auto string_id = string_ids[i];
534  const bool invalid = string_id > max_valid_int_value<T>();
535  if (invalid || string_id == inline_int_null_value<int32_t>()) {
536  if (invalid) {
537  throw_encoding_error<T>(string_vec[i], folder_);
538  }
539  encoded_vec[out_idx++] = inline_int_null_value<T>();
540  continue;
541  }
542  encoded_vec[out_idx++] = string_id;
543  }
544 }
const std::string folder_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string_view &  str)
privatenoexcept

Definition at line 1072 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::hash_string().

1072  {
1073  // @TODO(wei) treat empty string as NULL for now
1074  if (str.size() == 0) {
1075  return inline_int_null_value<int32_t>();
1076  }
1077  CHECK(str.size() <= MAX_STRLEN);
1078  const string_dict_hash_t hash = hash_string(str);
1079  {
1080  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1081  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1083  return string_id_string_dict_hash_table_[bucket];
1084  }
1085  }
1086  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1087  if (fillRateIsHigh(str_count_)) {
1088  // resize when more than 50% is full
1090  }
1091  // need to recalculate the bucket in case it changed before
1092  // we got the lock
1093  const uint32_t bucket = computeBucket(hash, str, string_id_string_dict_hash_table_);
1096  << "Maximum number (" << str_count_
1097  << ") of Dictionary encoded Strings reached for this column, offset path "
1098  "for column is "
1099  << offsets_path_;
1100  appendToStorage(str);
1101  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1102  if (materialize_hashes_) {
1103  hash_cache_[str_count_] = hash;
1104  }
1105  ++str_count_;
1107  }
1108  return string_id_string_dict_hash_table_[bucket];
1109 }
void increaseHashTableCapacity() noexcept
string_dict_hash_t hash_string(const std::string_view &str)
std::vector< string_dict_hash_t > hash_cache_
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorage(const String str) noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:219
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
uint32_t string_dict_hash_t
#define CHECK(condition)
Definition: Logger.h:209
std::vector< int32_t > string_id_string_dict_hash_table_
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 913 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

915  {
916  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
917  if (client_) {
918  return client_->get_regexp_like(pattern, escape, generation);
919  }
920  const auto cache_key = std::make_pair(pattern, escape);
921  const auto it = regex_cache_.find(cache_key);
922  if (it != regex_cache_.end()) {
923  return it->second;
924  }
925  std::vector<int32_t> result;
926  std::vector<std::thread> workers;
927  int worker_count = cpu_threads();
928  CHECK_GT(worker_count, 0);
929  std::vector<std::vector<int32_t>> worker_results(worker_count);
930  CHECK_LE(generation, str_count_);
931  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
932  workers.emplace_back([&worker_results,
933  &pattern,
934  generation,
935  escape,
936  worker_idx,
937  worker_count,
938  this]() {
939  for (size_t string_id = worker_idx; string_id < generation;
940  string_id += worker_count) {
941  const auto str = getStringUnlocked(string_id);
942  if (is_regexp_like(str, pattern, escape)) {
943  worker_results[worker_idx].push_back(string_id);
944  }
945  }
946  });
947  }
948  for (auto& worker : workers) {
949  worker.join();
950  }
951  for (const auto& worker_result : worker_results) {
952  result.insert(result.end(), worker_result.begin(), worker_result.end());
953  }
954  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
955  CHECK(it_ok.second);
956 
957  return result;
958 }
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:220
#define CHECK(condition)
Definition: Logger.h:209
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 581 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

581  {
582  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
583  if (client_) {
584  std::string ret;
585  client_->get_string(ret, string_id);
586  return ret;
587  }
588  return getStringUnlocked(string_id);
589 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 596 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

597  {
598  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
599  CHECK(!client_);
600  CHECK_LE(0, string_id);
601  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
602  return getStringBytesChecked(string_id);
603 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:219
#define CHECK_LE(x, y)
Definition: Logger.h:220
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:209
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1117 of file StringDictionary.cpp.

References CHECK.

1118  {
1119  const auto str_canary = getStringFromStorage(string_id);
1120  CHECK(!str_canary.canary);
1121  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1122 }
#define CHECK(condition)
Definition: Logger.h:209
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1111 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseHashTableCapacity().

1111  {
1112  const auto str_canary = getStringFromStorage(string_id);
1113  CHECK(!str_canary.canary);
1114  return std::string(str_canary.c_str_ptr, str_canary.size);
1115 }
#define CHECK(condition)
Definition: Logger.h:209
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1303 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1304  {
1305  if (!isTemp_) {
1306  CHECK_GE(payload_fd_, 0);
1307  CHECK_GE(offset_fd_, 0);
1308  }
1309  CHECK_GE(string_id, 0);
1310  const StringIdxEntry* str_meta = offset_map_ + string_id;
1311  if (str_meta->size == 0xffff) {
1312  // hit the canary
1313  return {nullptr, 0, true};
1314  }
1315  return {payload_map_ + str_meta->off, str_meta->size, false};
1316 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:222

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1297 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1298  {
1299  const StringIdxEntry* str_meta = offset_map_ + string_id;
1300  return {payload_map_ + str_meta->off, str_meta->size};
1301 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 591 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

591  {
592  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
593  return getStringChecked(string_id);
594 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:219

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 574 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string().

Referenced by getIdOfString().

574  {
575  const string_dict_hash_t hash = hash_string(str);
578  return str_id;
579 }
string_dict_hash_t hash_string(const std::string_view &str)
uint32_t computeBucket(const string_dict_hash_t hash, const String &input_string, const std::vector< int32_t > &string_id_string_dict_hash_table) const noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< string_dict_hash_t > &  hashes 
) const
privatenoexcept

Method to hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 347 of file StringDictionary.cpp.

References CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::hash_string(), and threading_serial::parallel_for().

Referenced by getOrAddBulkParallel().

349  {
350  CHECK_EQ(string_vec.size(), hashes.size());
351 
352  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
353  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
354  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
355  if (string_vec[curr_id].empty()) {
356  continue;
357  }
358  hashes[curr_id] = hash_string(string_vec[curr_id]);
359  }
360  });
361 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
string_dict_hash_t hash_string(const std::string_view &str)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseHashTableCapacity ( )
privatenoexcept

Definition at line 1016 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), hash_cache_, anonymous_namespace{StringDictionary.cpp}::hash_string(), i, INVALID_STR_ID, materialize_hashes_, str_count_, and string_id_string_dict_hash_table_.

Referenced by getOrAddBulk().

1016  {
1017  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1018  INVALID_STR_ID);
1019 
1020  if (materialize_hashes_) {
1021  for (size_t i = 0; i != str_count_; ++i) {
1022  const string_dict_hash_t hash = hash_cache_[i];
1023  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1024  new_str_ids[bucket] = i;
1025  }
1026  hash_cache_.resize(hash_cache_.size() * 2);
1027  } else {
1028  for (size_t i = 0; i != str_count_; ++i) {
1029  const auto str = getStringChecked(i);
1030  const string_dict_hash_t hash = hash_string(str);
1031  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1032  new_str_ids[bucket] = i;
1033  }
1034  }
1035  string_id_string_dict_hash_table_.swap(new_str_ids);
1036 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseHashTableCapacityFromStorageAndMemory ( const size_t  str_count,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< string_dict_hash_t > &  input_strings_hashes 
)
privatenoexcept

Definition at line 1039 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::hash_string(), and i.

Referenced by getOrAddBulkParallel().

1045  {
1046  std::vector<int32_t> new_str_ids(string_id_string_dict_hash_table_.size() * 2,
1047  INVALID_STR_ID);
1048  if (materialize_hashes_) {
1049  for (size_t i = 0; i != str_count; ++i) {
1050  const string_dict_hash_t hash = hash_cache_[i];
1051  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1052  new_str_ids[bucket] = i;
1053  }
1054  hash_cache_.resize(hash_cache_.size() * 2);
1055  } else {
1056  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1057  const auto storage_string = getStringChecked(storage_idx);
1058  const string_dict_hash_t hash = hash_string(storage_string);
1059  const uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1060  new_str_ids[bucket] = storage_idx;
1061  }
1062  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1063  const size_t string_memory_id = string_memory_ids[memory_idx];
1064  const uint32_t bucket = computeUniqueBucketWithHash(
1065  input_strings_hashes[string_memory_id], new_str_ids);
1066  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1067  }
1068  }
1069  string_id_string_dict_hash_table_.swap(new_str_ids);
1070 }
string_dict_hash_t hash_string(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
std::vector< string_dict_hash_t > hash_cache_
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
uint32_t string_dict_hash_t
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1378 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, regex_cache_, and gpu_enabled::swap().

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1378  {
1379  if (!like_cache_.empty()) {
1380  decltype(like_cache_)().swap(like_cache_);
1381  }
1382  if (!regex_cache_.empty()) {
1383  decltype(regex_cache_)().swap(regex_cache_);
1384  }
1385  if (!equal_cache_.empty()) {
1386  decltype(equal_cache_)().swap(equal_cache_);
1387  }
1388  compare_cache_.invalidateInvertedIndex();
1389 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1438 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1438  {
1439  // this method is not thread safe
1440  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1441  size_t t_idx = 0, s_idx = 0, idx = 0;
1442  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1443  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1444  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1445  const auto insert_from_temp_cache =
1446  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1447  if (insert_from_temp_cache) {
1448  updated_cache[idx] = temp_sorted_cache[t_idx++];
1449  } else {
1450  updated_cache[idx] = sorted_cache[s_idx++];
1451  }
1452  }
1453  while (t_idx < temp_sorted_cache.size()) {
1454  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1455  }
1456  while (s_idx < sorted_cache.size()) {
1457  updated_cache[idx++] = sorted_cache[s_idx++];
1458  }
1459  sorted_cache.swap(updated_cache);
1460 }
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1489 of file StringDictionary.cpp.

References threading_serial::async(), i, populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1493  {
1494  dest_array_ids.resize(source_array_ids.size());
1495 
1496  std::atomic<size_t> row_idx{0};
1497  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1498  int thread_id) {
1499  for (;;) {
1500  auto row = row_idx.fetch_add(1);
1501 
1502  if (row >= dest_array_ids.size()) {
1503  return;
1504  }
1505  const auto& source_ids = source_array_ids[row];
1506  auto& dest_ids = dest_array_ids[row];
1507  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1508  }
1509  };
1510 
1511  const int num_worker_threads = std::thread::hardware_concurrency();
1512 
1513  if (source_array_ids.size() / num_worker_threads > 10) {
1514  std::vector<std::future<void>> worker_threads;
1515  for (int i = 0; i < num_worker_threads; ++i) {
1516  worker_threads.push_back(std::async(std::launch::async, processor, i));
1517  }
1518 
1519  for (auto& child : worker_threads) {
1520  child.wait();
1521  }
1522  for (auto& child : worker_threads) {
1523  child.get();
1524  }
1525  } else {
1526  processor(0);
1527  }
1528 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
future< Result > async(Fn &&fn, Args &&...args)
ThreadId thread_id()
Definition: Logger.cpp:791

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1462 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1467  {
1468  std::vector<std::string> strings;
1469 
1470  for (const int32_t source_id : source_ids) {
1471  if (source_id == std::numeric_limits<int32_t>::min()) {
1472  strings.emplace_back("");
1473  } else if (source_id < 0) {
1474  if (auto string_itr = transient_mapping.find(source_id);
1475  string_itr != transient_mapping.end()) {
1476  strings.emplace_back(string_itr->second);
1477  } else {
1478  throw std::runtime_error("Unexpected negative source ID");
1479  }
1480  } else {
1481  strings.push_back(source_dict->getString(source_id));
1482  }
1483  }
1484 
1485  dest_ids.resize(strings.size());
1486  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1487 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< string_dict_hash_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 215 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), hash_cache_, materialize_hashes_, payload_file_off_, str_count_, and string_id_string_dict_hash_table_.

Referenced by StringDictionary().

217  {
218  for (auto& dictionary_future : dictionary_futures) {
219  dictionary_future.wait();
220  const auto hashVec = dictionary_future.get();
221  for (const auto& hash : hashVec) {
222  const uint32_t bucket =
224  payload_file_off_ += hash.second;
225  string_id_string_dict_hash_table_[bucket] = static_cast<int32_t>(str_count_);
226  if (materialize_hashes_) {
227  hash_cache_[str_count_] = hash.first;
228  }
229  ++str_count_;
230  }
231  }
232  dictionary_futures.clear();
233 }
std::vector< string_dict_hash_t > hash_cache_
uint32_t computeUniqueBucketWithHash(const string_dict_hash_t hash, const std::vector< int32_t > &string_id_string_dict_hash_table) noexcept
std::vector< int32_t > string_id_string_dict_hash_table_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1425 of file StringDictionary.cpp.

References anonymous_namespace{Utm.h}::a, getStringFromStorage(), gpu_enabled::sort(), and string_lt().

Referenced by buildSortedCache().

1425  {
1426  // This method is not thread-safe.
1427 
1428  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1429  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1430 
1431  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1432  auto a_str = this->getStringFromStorage(a);
1433  auto b_str = this->getStringFromStorage(b);
1434  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1435  });
1436 }
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
RUNTIME_EXPORT DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
constexpr double a
Definition: Utm.h:38
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 605 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

605  {
606  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
607  if (client_) {
608  return client_->storage_entry_count();
609  }
610  return str_count_;
611 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 232 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 233 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 230 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

size_t StringDictionary::collisions_
private

Definition at line 208 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 227 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 226 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

const std::string StringDictionary::folder_
private

Definition at line 206 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

std::vector<string_dict_hash_t> StringDictionary::hash_cache_
private
std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 224 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 117 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 214 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 225 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 211 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_string_dict_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 228 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: