OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T , class String >
void getOrAddBulk (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class T , class String >
void getOrAddBulkParallel (const std::vector< String > &string_vec, T *encoded_vec)
 
template<class String >
void getOrAddBulkArray (const std::vector< std::vector< String >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
size_t getNumStringsFromStorage (const size_t storage_slots) const noexcept
 
bool fillRateIsHigh (const size_t num_strings) const noexcept
 
void increaseCapacity () noexcept
 
template<class String >
void increaseCapacityFromStorageAndMemory (const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class String >
void hashStrings (const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
 
template<class T , class String >
void getOrAddBulkRemote (const std::vector< String > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
template<class String >
uint32_t computeBucket (const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
 
template<class String >
uint32_t computeBucketFromStorageAndMemory (const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) noexcept
 
void checkAndConditionallyIncreasePayloadCapacity (const size_t write_length)
 
void checkAndConditionallyIncreaseOffsetCapacity (const size_t write_length)
 
template<class String >
void appendToStorage (String str) noexcept
 
template<class String >
void appendToStorageBulk (const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
std::string_view getStringFromStorageFast (const int string_id) const noexcept
 
void addPayloadCapacity (const size_t min_capacity_requested=0) noexcept
 
void addOffsetCapacity (const size_t min_capacity_requested=0) noexcept
 
size_t addStorageCapacity (int fd, const size_t min_capacity_requested=0) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
size_t collisions_
 
std::vector< int32_t > string_id_hash_table_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 
char * CANARY_BUFFER {nullptr}
 
size_t canary_buffer_size = 0
 

Detailed Description

Definition at line 42 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 95 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, omnisci::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), collisions_, omnisci::file_size(), getNumStringsFromStorage(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_count_, string_id_hash_table_, VLOG, and logger::WARNING.

100  : str_count_(0)
101  , string_id_hash_table_(initial_capacity, INVALID_STR_ID)
102  , rk_hashes_(initial_capacity)
103  , isTemp_(isTemp)
104  , materialize_hashes_(materializeHashes)
105  , payload_fd_(-1)
106  , offset_fd_(-1)
107  , offset_map_(nullptr)
108  , payload_map_(nullptr)
109  , offset_file_size_(0)
110  , payload_file_size_(0)
111  , payload_file_off_(0)
112  , strings_cache_(nullptr) {
113  if (!isTemp && folder.empty()) {
114  return;
115  }
116 
117  // initial capacity must be a power of two for efficient bucket computation
118  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
119  if (!isTemp_) {
120  boost::filesystem::path storage_path(folder);
121  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
122  const auto payload_path =
123  (storage_path / boost::filesystem::path("DictPayload")).string();
124  payload_fd_ = checked_open(payload_path.c_str(), recover);
125  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
128  }
129  bool storage_is_empty = false;
130  if (payload_file_size_ == 0) {
131  storage_is_empty = true;
133  }
134  if (offset_file_size_ == 0) {
136  }
137  if (!isTemp_) { // we never mmap or recover temp dictionaries
138  payload_map_ =
139  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
140  offset_map_ = reinterpret_cast<StringIdxEntry*>(
142  if (recover) {
143  const size_t bytes = omnisci::file_size(offset_fd_);
144  if (bytes % sizeof(StringIdxEntry) != 0) {
145  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
146  }
147  const uint64_t str_count =
148  storage_is_empty ? 0 : getNumStringsFromStorage(bytes / sizeof(StringIdxEntry));
149  collisions_ = 0;
150  // at this point we know the size of the StringDict we need to load
151  // so lets reallocate the vector to the correct size
152  const uint64_t max_entries =
153  std::max(round_up_p2(str_count * 2 + 1),
154  round_up_p2(std::max(initial_capacity, static_cast<size_t>(1))));
155  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
156  string_id_hash_table_.swap(new_str_ids);
157  if (materialize_hashes_) {
158  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
159  rk_hashes_.swap(new_rk_hashes);
160  }
161  // Bail early if we know we don't have strings to add (i.e. a new or empty
162  // dictionary)
163  if (str_count == 0) {
164  return;
165  }
166 
167  unsigned string_id = 0;
168  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
169 
170  uint32_t thread_inits = 0;
171  const auto thread_count = std::thread::hardware_concurrency();
172  const uint32_t items_per_thread = std::max<uint32_t>(
173  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
174  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
175  dictionary_futures;
176  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
177  dictionary_futures.emplace_back(std::async(
178  std::launch::async, [string_id, str_count, items_per_thread, this] {
179  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
180  for (uint32_t curr_id = string_id;
181  curr_id < string_id + items_per_thread && curr_id < str_count;
182  curr_id++) {
183  const auto recovered = getStringFromStorage(curr_id);
184  if (recovered.canary) {
185  // hit the canary, recovery finished
186  break;
187  } else {
188  std::string temp(recovered.c_str_ptr, recovered.size);
189  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
190  }
191  }
192  return hashVec;
193  }));
194  thread_inits++;
195  if (thread_inits % thread_count == 0) {
196  processDictionaryFutures(dictionary_futures);
197  }
198  }
199  // gather last few threads
200  if (dictionary_futures.size() != 0) {
201  processDictionaryFutures(dictionary_futures);
202  }
203  VLOG(1) << "Opened string dictionary " << folder << " # Strings: " << str_count_
204  << " Hash table size: " << string_id_hash_table_.size() << " Fill rate: "
205  << static_cast<double>(str_count_) * 100.0 / string_id_hash_table_.size()
206  << "% Collisions: " << collisions_;
207  }
208  }
209 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
#define LOG(tag)
Definition: Logger.h:188
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
int checked_open(const char *path, const bool recover)
std::vector< int32_t > string_id_hash_table_
size_t getNumStringsFromStorage(const size_t storage_slots) const noexcept
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
#define VLOG(n)
Definition: Logger.h:291
std::vector< uint32_t > rk_hashes_
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 260 of file StringDictionary.cpp.

261  : strings_cache_(nullptr)
262  , client_(new StringDictionaryClient(host, dict_ref, true))
263  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 265 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_GE, omnisci::checked_munmap(), client_, omnisci::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

265  {
266  free(CANARY_BUFFER);
267  if (client_) {
268  return;
269  }
270  if (payload_map_) {
271  if (!isTemp_) {
275  CHECK_GE(payload_fd_, 0);
277  CHECK_GE(offset_fd_, 0);
279  } else {
281  free(payload_map_);
282  free(offset_map_);
283  }
284  }
285 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::unique_ptr< StringDictionaryClient > client_
void close(const int fd)
Definition: omnisci_fs.cpp:68
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1333 of file StringDictionary.cpp.

References CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

1335  {
1336  const size_t canary_buff_size_to_add =
1337  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1338  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1339  if (canary_buffer_size != canary_buff_size_to_add) {
1340  CANARY_BUFFER =
1341  reinterpret_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1342  canary_buffer_size = canary_buff_size_to_add;
1343  }
1345  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1346  void* new_addr = realloc(addr, mem_size + canary_buff_size_to_add);
1347  CHECK(new_addr);
1348  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1349  CHECK(memcpy(write_addr, CANARY_BUFFER, canary_buff_size_to_add));
1350  mem_size += canary_buff_size_to_add;
1351  return new_addr;
1352 }
#define CHECK(condition)
Definition: Logger.h:197
void StringDictionary::addOffsetCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1303 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreaseOffsetCapacity(), and StringDictionary().

1303  {
1304  if (!isTemp_) {
1305  offset_file_size_ += addStorageCapacity(offset_fd_, min_capacity_requested);
1306  } else {
1307  offset_map_ = static_cast<StringIdxEntry*>(
1308  addMemoryCapacity(offset_map_, offset_file_size_, min_capacity_requested));
1309  }
1310 }
StringIdxEntry * offset_map_
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( const size_t  min_capacity_requested = 0)
privatenoexcept

Definition at line 1294 of file StringDictionary.cpp.

Referenced by checkAndConditionallyIncreasePayloadCapacity(), and StringDictionary().

1294  {
1295  if (!isTemp_) {
1296  payload_file_size_ += addStorageCapacity(payload_fd_, min_capacity_requested);
1297  } else {
1298  payload_map_ = static_cast<char*>(
1299  addMemoryCapacity(payload_map_, payload_file_size_, min_capacity_requested));
1300  }
1301 }
size_t addStorageCapacity(int fd, const size_t min_capacity_requested=0) noexcept
void * addMemoryCapacity(void *addr, size_t &mem_size, const size_t min_capacity_requested=0) noexcept

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd,
const size_t  min_capacity_requested = 0 
)
privatenoexcept

Definition at line 1312 of file StringDictionary.cpp.

References CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

1314  {
1315  const size_t canary_buff_size_to_add =
1316  std::max(static_cast<size_t>(1024 * SYSTEM_PAGE_SIZE),
1317  (min_capacity_requested / SYSTEM_PAGE_SIZE + 1) * SYSTEM_PAGE_SIZE);
1318 
1319  if (canary_buffer_size != canary_buff_size_to_add) {
1320  CANARY_BUFFER = static_cast<char*>(realloc(CANARY_BUFFER, canary_buff_size_to_add));
1321  canary_buffer_size = canary_buff_size_to_add;
1322  }
1324  memset(CANARY_BUFFER, 0xff, canary_buff_size_to_add);
1325 
1326  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1327  const auto write_return = write(fd, CANARY_BUFFER, canary_buff_size_to_add);
1328  CHECK(write_return > 0 &&
1329  (static_cast<size_t>(write_return) == canary_buff_size_to_add));
1330  return canary_buff_size_to_add;
1331 }
#define CHECK_NE(x, y)
Definition: Logger.h:206
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:126
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

template<class String >
void StringDictionary::appendToStorage ( String  str)
privatenoexcept

Definition at line 1239 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

1239  {
1240  // write the payload
1242  memcpy(payload_map_ + payload_file_off_, str.data(), str.size());
1243 
1244  // write the offset and length
1245  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
1246  payload_file_off_ += str.size(); // Need to increment after we've defined str_meta
1247 
1249  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
1250 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::appendToStorageBulk ( const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const size_t  sum_new_strings_lengths 
)
privatenoexcept

Definition at line 1253 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1256  {
1257  const size_t num_strings = string_memory_ids.size();
1258 
1259  checkAndConditionallyIncreasePayloadCapacity(sum_new_strings_lengths);
1260  checkAndConditionallyIncreaseOffsetCapacity(sizeof(StringIdxEntry) * num_strings);
1261 
1262  for (size_t i = 0; i < num_strings; ++i) {
1263  const size_t string_idx = string_memory_ids[i];
1264  const String str = input_strings[string_idx];
1265  const size_t str_size(str.size());
1266  memcpy(payload_map_ + payload_file_off_, str.data(), str_size);
1267  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str_size};
1268  payload_file_off_ += str_size; // Need to increment after we've defined str_meta
1269  memcpy(offset_map_ + str_count_ + i, &str_meta, sizeof(str_meta));
1270  }
1271 }
StringIdxEntry * offset_map_
void checkAndConditionallyIncreasePayloadCapacity(const size_t write_length)
void checkAndConditionallyIncreaseOffsetCapacity(const size_t write_length)

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1386 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1386  {
1387  // This method is not thread-safe.
1388  const auto cur_cache_size = sorted_cache.size();
1389  std::vector<int32_t> temp_sorted_cache;
1390  for (size_t i = cur_cache_size; i < str_count_; i++) {
1391  temp_sorted_cache.push_back(i);
1392  }
1393  sortCache(temp_sorted_cache);
1394  mergeSortedCache(temp_sorted_cache);
1395 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity ( const size_t  write_length)
private

Definition at line 1218 of file StringDictionary.cpp.

References addOffsetCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, and str_count_.

1219  {
1220  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
1221  if (offset_file_off + write_length >= offset_file_size_) {
1222  const size_t min_capacity_needed =
1223  write_length - (offset_file_size_ - offset_file_off);
1224  if (!isTemp_) {
1225  CHECK_GE(offset_fd_, 0);
1227  addOffsetCapacity(min_capacity_needed);
1228  CHECK(offset_file_off + write_length <= offset_file_size_);
1229  offset_map_ = reinterpret_cast<StringIdxEntry*>(
1231  } else {
1232  addOffsetCapacity(min_capacity_needed);
1233  CHECK(offset_file_off + write_length <= offset_file_size_);
1234  }
1235  }
1236 }
StringIdxEntry * offset_map_
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
void addOffsetCapacity(const size_t min_capacity_requested=0) noexcept
#define CHECK_GE(x, y)
Definition: Logger.h:210
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void StringDictionary::checkAndConditionallyIncreasePayloadCapacity ( const size_t  write_length)
private

Definition at line 1199 of file StringDictionary.cpp.

References addPayloadCapacity(), CHECK, CHECK_GE, omnisci::checked_mmap(), omnisci::checked_munmap(), isTemp_, payload_fd_, payload_file_off_, payload_file_size_, and payload_map_.

1200  {
1201  if (payload_file_off_ + write_length > payload_file_size_) {
1202  const size_t min_capacity_needed =
1203  write_length - (payload_file_size_ - payload_file_off_);
1204  if (!isTemp_) {
1205  CHECK_GE(payload_fd_, 0);
1207  addPayloadCapacity(min_capacity_needed);
1208  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1209  payload_map_ =
1210  reinterpret_cast<char*>(omnisci::checked_mmap(payload_fd_, payload_file_size_));
1211  } else {
1212  addPayloadCapacity(min_capacity_needed);
1213  CHECK(payload_file_off_ + write_length <= payload_file_size_);
1214  }
1215  }
1216 }
void * checked_mmap(const int fd, const size_t sz)
Definition: omnisci_fs.cpp:38
#define CHECK_GE(x, y)
Definition: Logger.h:210
void addPayloadCapacity(const size_t min_capacity_requested=0) noexcept
void checked_munmap(void *addr, size_t length)
Definition: omnisci_fs.cpp:51
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1367 of file StringDictionary.cpp.

References CHECK, client_, omnisci::fsync(), isTemp_, omnisci::msync(), offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by import_export::TypedImportBuffer::stringDictCheckpoint().

1367  {
1368  if (client_) {
1369  try {
1370  return client_->checkpoint();
1371  } catch (...) {
1372  return false;
1373  }
1374  }
1375  CHECK(!isTemp_);
1376  bool ret = true;
1377  ret = ret &&
1378  (omnisci::msync((void*)offset_map_, offset_file_size_, /*async=*/false) == 0);
1379  ret = ret &&
1380  (omnisci::msync((void*)payload_map_, payload_file_size_, /*async=*/false) == 0);
1381  ret = ret && (omnisci::fsync(offset_fd_) == 0);
1382  ret = ret && (omnisci::fsync(payload_fd_) == 0);
1383  return ret;
1384 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int msync(void *addr, size_t length, bool async)
Definition: omnisci_fs.cpp:55
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const String &  str,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 1102 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

1105  {
1106  auto bucket = hash & (data.size() - 1);
1107  while (true) {
1108  const int32_t candidate_string_id = data[bucket];
1109  if (candidate_string_id ==
1110  INVALID_STR_ID) { // In this case it means the slot is available for use
1111  break;
1112  }
1113  if (!materialize_hashes_ ||
1114  (materialize_hashes_ && hash == rk_hashes_[candidate_string_id])) {
1115  const auto old_str = getStringFromStorageFast(candidate_string_id);
1116  if (str.size() == old_str.size() &&
1117  !memcmp(str.data(), old_str.data(), str.size())) {
1118  // found the string
1119  break;
1120  }
1121  }
1122  // wrap around
1123  if (++bucket == data.size()) {
1124  bucket = 0;
1125  }
1126  }
1127  return bucket;
1128 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

template<class String >
uint32_t StringDictionary::computeBucketFromStorageAndMemory ( const uint32_t  input_string_rk_hash,
const String &  input_string,
const std::vector< int32_t > &  string_id_hash_table,
const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids 
) const
privatenoexcept

memcmp(input_string.data(), candidate_storage_string.c_str_ptr, input_string.size())) {

Definition at line 1131 of file StringDictionary.cpp.

Referenced by getOrAddBulkParallel().

1137  {
1138  auto bucket = input_string_rk_hash & (string_id_hash_table.size() - 1);
1139  while (true) {
1140  const int32_t candidate_string_id = string_id_hash_table[bucket];
1141  if (candidate_string_id ==
1142  INVALID_STR_ID) { // In this case it means the slot is available for use
1143  break;
1144  }
1145  if (!materialize_hashes_ ||
1146  (input_string_rk_hash == rk_hashes_[candidate_string_id])) {
1147  if (candidate_string_id > 0 &&
1148  static_cast<size_t>(candidate_string_id) >= storage_high_water_mark) {
1149  // The candidate string is not in storage yet but in our string_memory_ids temp
1150  // buffer
1151  size_t memory_offset =
1152  static_cast<size_t>(candidate_string_id - storage_high_water_mark);
1153  const String candidate_string = input_strings[string_memory_ids[memory_offset]];
1154  if (input_string.size() == candidate_string.size() &&
1155  !memcmp(input_string.data(), candidate_string.data(), input_string.size())) {
1156  // found the string in the temp memory buffer
1157  break;
1158  }
1159  } else {
1160  // The candidate string is in storage, need to fetch it for comparison
1161  const auto candidate_storage_string =
1162  getStringFromStorageFast(candidate_string_id);
1163  if (input_string.size() == candidate_storage_string.size() &&
1164  !memcmp(input_string.data(),
1165  candidate_storage_string.data(),
1166  input_string.size())) {
1169  // found the string in storage
1170  break;
1171  }
1172  }
1173  }
1174  if (++bucket == string_id_hash_table.size()) {
1175  bucket = 0;
1176  }
1177  }
1178  return bucket;
1179 }
std::string_view getStringFromStorageFast(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
)
privatenoexcept

Definition at line 1181 of file StringDictionary.cpp.

Referenced by increaseCapacity(), and processDictionaryFutures().

1183  {
1184  auto bucket = hash & (data.size() - 1);
1185  while (true) {
1186  if (data[bucket] ==
1187  INVALID_STR_ID) { // In this case it means the slot is available for use
1188  break;
1189  }
1190  collisions_++;
1191  // wrap around
1192  if (++bucket == data.size()) {
1193  bucket = 0;
1194  }
1195  }
1196  return bucket;
1197 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 935 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

935  {
936  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
937  if (client_) {
938  // TODO(miyu): support remote string dictionary
939  throw std::runtime_error(
940  "copying dictionaries from remote server is not supported yet.");
941  }
942 
943  if (strings_cache_) {
944  return strings_cache_;
945  }
946 
947  strings_cache_ = std::make_shared<std::vector<std::string>>();
948  strings_cache_->reserve(str_count_);
949  const bool multithreaded = str_count_ > 10000;
950  const auto worker_count =
951  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
952  CHECK_GT(worker_count, 0UL);
953  std::vector<std::vector<std::string>> worker_results(worker_count);
954  auto copy = [this](std::vector<std::string>& str_list,
955  const size_t start_id,
956  const size_t end_id) {
957  CHECK_LE(start_id, end_id);
958  str_list.reserve(end_id - start_id);
959  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
960  str_list.push_back(getStringUnlocked(string_id));
961  }
962  };
963  if (multithreaded) {
964  std::vector<std::future<void>> workers;
965  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
966  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
967  worker_idx < worker_count && start < str_count_;
968  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
969  workers.push_back(std::async(
970  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
971  }
972  for (auto& worker : workers) {
973  worker.get();
974  }
975  } else {
976  CHECK_EQ(worker_results.size(), size_t(1));
977  copy(worker_results[0], 0, str_count_);
978  }
979 
980  for (const auto& worker_result : worker_results) {
981  strings_cache_->insert(
982  strings_cache_->end(), worker_result.begin(), worker_result.end());
983  }
984  return strings_cache_;
985 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( const size_t  num_strings) const
privatenoexcept

Definition at line 987 of file StringDictionary.cpp.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

987  {
988  return string_id_hash_table_.size() <= num_strings * 2;
989 }
std::vector< int32_t > string_id_hash_table_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 727 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

729  {
730  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
731  if (client_) {
732  return client_->get_compare(pattern, comp_operator, generation);
733  }
734  std::vector<int32_t> ret;
735  if (str_count_ == 0) {
736  return ret;
737  }
738  if (sorted_cache.size() < str_count_) {
739  if (comp_operator == "=" || comp_operator == "<>") {
740  return getEquals(pattern, comp_operator, generation);
741  }
742 
744  }
745  auto cache_index = compare_cache_.get(pattern);
746 
747  if (!cache_index) {
748  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
749  const auto cache_itr = std::lower_bound(
750  sorted_cache.begin(),
751  sorted_cache.end(),
752  pattern,
753  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
754  auto a_str = this->getStringFromStorage(a);
755  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
756  });
757 
758  if (cache_itr == sorted_cache.end()) {
759  cache_index->index = sorted_cache.size() - 1;
760  cache_index->diff = 1;
761  } else {
762  const auto cache_str = getStringFromStorage(*cache_itr);
763  if (!string_eq(
764  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
765  cache_index->index = cache_itr - sorted_cache.begin() - 1;
766  cache_index->diff = 1;
767  } else {
768  cache_index->index = cache_itr - sorted_cache.begin();
769  cache_index->diff = 0;
770  }
771  }
772 
773  compare_cache_.put(pattern, cache_index);
774  }
775 
776  // since we have a cache in form of vector of ints which is sorted according to
777  // corresponding strings in the dictionary all we need is the index of the element
778  // which equal to the pattern that we are trying to match or the index of “biggest”
779  // element smaller than the pattern, to perform all the comparison operators over
780  // string. The search function guarantees we have such index so now it is just the
781  // matter to include all the elements in the result vector.
782 
783  // For < operator if the index that we have points to the element which is equal to
784  // the pattern that we are searching for we simply get all the elements less than the
785  // index. If the element pointed by the index is not equal to the pattern we are
786  // comparing with we also need to include that index in result vector, except when the
787  // index points to 0 and the pattern is lesser than the smallest value in the string
788  // dictionary.
789 
790  if (comp_operator == "<") {
791  size_t idx = cache_index->index;
792  if (cache_index->diff) {
793  idx = cache_index->index + 1;
794  if (cache_index->index == 0 && cache_index->diff > 0) {
795  idx = cache_index->index;
796  }
797  }
798  for (size_t i = 0; i < idx; i++) {
799  ret.push_back(sorted_cache[i]);
800  }
801 
802  // For <= operator if the index that we have points to the element which is equal to
803  // the pattern that we are searching for we want to include the element pointed by
804  // the index in the result set. If the element pointed by the index is not equal to
805  // the pattern we are comparing with we just want to include all the ids with index
806  // less than the index that is cached, except when pattern that we are searching for
807  // is smaller than the smallest string in the dictionary.
808 
809  } else if (comp_operator == "<=") {
810  size_t idx = cache_index->index + 1;
811  if (cache_index == 0 && cache_index->diff > 0) {
812  idx = cache_index->index;
813  }
814  for (size_t i = 0; i < idx; i++) {
815  ret.push_back(sorted_cache[i]);
816  }
817 
818  // For > operator we want to get all the elements with index greater than the index
819  // that we have except, when the pattern we are searching for is lesser than the
820  // smallest string in the dictionary we also want to include the id of the index
821  // that we have.
822 
823  } else if (comp_operator == ">") {
824  size_t idx = cache_index->index + 1;
825  if (cache_index->index == 0 && cache_index->diff > 0) {
826  idx = cache_index->index;
827  }
828  for (size_t i = idx; i < sorted_cache.size(); i++) {
829  ret.push_back(sorted_cache[i]);
830  }
831 
832  // For >= operator when the indexed element that we have points to element which is
833  // equal to the pattern we are searching for we want to include that in the result
834  // vector. If the index that we have does not point to the string which is equal to
835  // the pattern we are searching we don’t want to include that id into the result
836  // vector except when the index is 0.
837 
838  } else if (comp_operator == ">=") {
839  size_t idx = cache_index->index;
840  if (cache_index->diff) {
841  idx = cache_index->index + 1;
842  if (cache_index->index == 0 && cache_index->diff > 0) {
843  idx = cache_index->index;
844  }
845  }
846  for (size_t i = idx; i < sorted_cache.size(); i++) {
847  ret.push_back(sorted_cache[i]);
848  }
849  } else if (comp_operator == "=") {
850  if (!cache_index->diff) {
851  ret.push_back(sorted_cache[cache_index->index]);
852  }
853 
854  // For <> operator it is simple matter of not including id of string which is equal
855  // to pattern we are searching for.
856  } else if (comp_operator == "<>") {
857  if (!cache_index->diff) {
858  size_t idx = cache_index->index;
859  for (size_t i = 0; i < idx; i++) {
860  ret.push_back(sorted_cache[i]);
861  }
862  ++idx;
863  for (size_t i = idx; i < sorted_cache.size(); i++) {
864  ret.push_back(sorted_cache[i]);
865  }
866  } else {
867  for (size_t i = 0; i < sorted_cache.size(); i++) {
868  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
869  }
870  }
871 
872  } else {
873  std::runtime_error("Unsupported string comparison operator");
874  }
875  return ret;
876 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
mapd_unique_lock< mapd_shared_mutex > write_lock
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 667 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

669  {
670  std::vector<int32_t> result;
671  auto eq_id_itr = equal_cache_.find(pattern);
672  int32_t eq_id = MAX_STRLEN + 1;
673  int32_t cur_size = str_count_;
674  if (eq_id_itr != equal_cache_.end()) {
675  auto eq_id = eq_id_itr->second;
676  if (comp_operator == "=") {
677  result.push_back(eq_id);
678  } else {
679  for (int32_t idx = 0; idx <= cur_size; idx++) {
680  if (idx == eq_id) {
681  continue;
682  }
683  result.push_back(idx);
684  }
685  }
686  } else {
687  std::vector<std::thread> workers;
688  int worker_count = cpu_threads();
689  CHECK_GT(worker_count, 0);
690  std::vector<std::vector<int32_t>> worker_results(worker_count);
691  CHECK_LE(generation, str_count_);
692  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
693  workers.emplace_back(
694  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
695  for (size_t string_id = worker_idx; string_id < generation;
696  string_id += worker_count) {
697  const auto str = getStringUnlocked(string_id);
698  if (str == pattern) {
699  worker_results[worker_idx].push_back(string_id);
700  }
701  }
702  });
703  }
704  for (auto& worker : workers) {
705  worker.join();
706  }
707  for (const auto& worker_result : worker_results) {
708  result.insert(result.end(), worker_result.begin(), worker_result.end());
709  }
710  if (result.size() > 0) {
711  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
712  CHECK(it_ok.second);
713  eq_id = result[0];
714  }
715  if (comp_operator == "<>") {
716  for (int32_t idx = 0; idx <= cur_size; idx++) {
717  if (idx == eq_id) {
718  continue;
719  }
720  result.push_back(idx);
721  }
722  }
723  }
724  return result;
725 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 542 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

542  {
543  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
544  if (client_) {
545  return client_->get(str);
546  }
547  return getUnlocked(str);
548 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 614 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

618  {
619  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
620  if (client_) {
621  return client_->get_like(pattern, icase, is_simple, escape, generation);
622  }
623  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
624  const auto it = like_cache_.find(cache_key);
625  if (it != like_cache_.end()) {
626  return it->second;
627  }
628  std::vector<int32_t> result;
629  std::vector<std::thread> workers;
630  int worker_count = cpu_threads();
631  CHECK_GT(worker_count, 0);
632  std::vector<std::vector<int32_t>> worker_results(worker_count);
633  CHECK_LE(generation, str_count_);
634  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
635  workers.emplace_back([&worker_results,
636  &pattern,
637  generation,
638  icase,
639  is_simple,
640  escape,
641  worker_idx,
642  worker_count,
643  this]() {
644  for (size_t string_id = worker_idx; string_id < generation;
645  string_id += worker_count) {
646  const auto str = getStringUnlocked(string_id);
647  if (is_like(str, pattern, icase, is_simple, escape)) {
648  worker_results[worker_idx].push_back(string_id);
649  }
650  }
651  });
652  }
653  for (auto& worker : workers) {
654  worker.join();
655  }
656  for (const auto& worker_result : worker_results) {
657  result.insert(result.end(), worker_result.begin(), worker_result.end());
658  }
659  // place result into cache for reuse if similar query
660  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
661 
662  CHECK(it_ok.second);
663 
664  return result;
665 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

size_t StringDictionary::getNumStringsFromStorage ( const size_t  storage_slots) const
privatenoexcept

Method to retrieve number of strings in storage via a binary search for the first canary

Parameters
storage_slotsnumber of storage entries we should search to find the minimum canary
Returns
number of strings in storage

Definition at line 237 of file StringDictionary.cpp.

References CHECK_GE.

Referenced by StringDictionary().

238  {
239  if (storage_slots == 0) {
240  return 0;
241  }
242  // Must use signed integers since final binary search step can wrap to max size_t value
243  // if dictionary is empty
244  int64_t min_bound = 0;
245  int64_t max_bound = storage_slots - 1;
246  int64_t guess{0};
247  while (min_bound <= max_bound) {
248  guess = (max_bound + min_bound) / 2;
249  CHECK_GE(guess, 0);
250  if (getStringFromStorage(guess).canary) {
251  max_bound = guess - 1;
252  } else {
253  min_bound = guess + 1;
254  }
255  }
256  CHECK_GE(guess + (min_bound > guess ? 1 : 0), 0);
257  return guess + (min_bound > guess ? 1 : 0);
258 }
#define CHECK_GE(x, y)
Definition: Logger.h:210
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 287 of file StringDictionary.cpp.

References CHECK_EQ.

287  {
288  if (client_) {
289  std::vector<int32_t> string_ids;
290  client_->get_or_add_bulk(string_ids, std::vector<std::string>{str});
291  CHECK_EQ(size_t(1), string_ids.size());
292  return string_ids.front();
293  }
294  return getOrAddImpl(str);
295 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
template<class T , class String >
template void StringDictionary::getOrAddBulk ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 347 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), g_enable_stringdict_parallel, getOrAddBulkParallel(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by import_export::TypedImportBuffer::addDictEncodedString(), ArrowForeignStorageBase::convertArrowDictionary(), ArrowForeignStorageBase::createDictionaryEncodedColumn(), foreign_storage::ParquetStringEncoder< V >::encodeAndCopyContiguous(), getOrAddBulkArray(), and populate_string_ids().

348  {
350  getOrAddBulkParallel(input_strings, output_string_ids);
351  return;
352  }
353  // Single-thread path.
354  if (client_no_timeout_) {
355  getOrAddBulkRemote(input_strings, output_string_ids);
356  return;
357  }
358  size_t out_idx{0};
359  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
360 
361  for (const auto& str : input_strings) {
362  if (str.empty()) {
363  output_string_ids[out_idx++] = inline_int_null_value<T>();
364  continue;
365  }
366  CHECK(str.size() <= MAX_STRLEN);
367  uint32_t bucket;
368  const uint32_t hash = rk_hash(str);
369  bucket = computeBucket(hash, str, string_id_hash_table_);
370  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
371  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
372  continue;
373  }
374  // need to add record to dictionary
375  // check there is room
376  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
377  log_encoding_error<T>(str);
378  output_string_ids[out_idx++] = inline_int_null_value<T>();
379  continue;
380  }
381  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
383  << "Maximum number (" << str_count_
384  << ") of Dictionary encoded Strings reached for this column, offset path "
385  "for column is "
386  << offsets_path_;
387  if (fillRateIsHigh(str_count_)) {
388  // resize when more than 50% is full
390  bucket = computeBucket(hash, str, string_id_hash_table_);
391  }
392  appendToStorage(str);
393 
394  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
395  if (materialize_hashes_) {
396  rk_hashes_[str_count_] = hash;
397  }
398  ++str_count_;
399  }
400  output_string_ids[out_idx++] = string_id_hash_table_[bucket];
401  }
403 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
void getOrAddBulkParallel(const std::vector< String > &string_vec, T *encoded_vec)
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_stringdict_parallel
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
template void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< String >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 309 of file StringDictionary.cpp.

References getOrAddBulk().

Referenced by import_export::TypedImportBuffer::addDictEncodedStringArray().

311  {
312  ids_array_vec.resize(string_array_vec.size());
313  for (size_t i = 0; i < string_array_vec.size(); i++) {
314  auto& strings = string_array_vec[i];
315  auto& ids = ids_array_vec[i];
316  ids.resize(strings.size());
317  getOrAddBulk(strings, &ids[0]);
318  }
319 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
void StringDictionary::getOrAddBulkParallel ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)

Definition at line 406 of file StringDictionary.cpp.

References appendToStorageBulk(), CHECK, CHECK_LT, client_no_timeout_, computeBucketFromStorageAndMemory(), fillRateIsHigh(), getOrAddBulkRemote(), hashStrings(), increaseCapacityFromStorageAndMemory(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, rk_hashes_, rw_mutex_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

407  {
408  if (client_no_timeout_) {
409  getOrAddBulkRemote(input_strings, output_string_ids);
410  return;
411  }
412  // Run rk_hash on the input strings up front, and in parallel,
413  // as the string hashing does not need to be behind the subsequent write_lock
414  std::vector<uint32_t> input_strings_rk_hashes(input_strings.size());
415  hashStrings(input_strings, input_strings_rk_hashes);
416 
417  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
418  size_t shadow_str_count =
419  str_count_; // Need to shadow str_count_ now with bulk add methods
420  const size_t storage_high_water_mark = shadow_str_count;
421  std::vector<size_t> string_memory_ids;
422  size_t sum_new_string_lengths = 0;
423  string_memory_ids.reserve(input_strings.size());
424  size_t input_string_idx{0};
425  for (const auto& input_string : input_strings) {
426  // Currently we make empty strings null
427  if (input_string.empty()) {
428  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
429  continue;
430  }
431  // TODO: Recover gracefully if an input string is too long
432  CHECK(input_string.size() <= MAX_STRLEN);
433 
434  if (fillRateIsHigh(shadow_str_count)) {
435  // resize when more than 50% is full
436  increaseCapacityFromStorageAndMemory(storage_high_water_mark,
437  input_strings,
438  string_memory_ids,
439  input_strings_rk_hashes);
440  }
441  // Get the rk_hash for this input_string
442  const uint32_t input_string_rk_hash = input_strings_rk_hashes[input_string_idx];
443 
444  uint32_t hash_bucket = computeBucketFromStorageAndMemory(input_string_rk_hash,
445  input_string,
447  storage_high_water_mark,
448  input_strings,
449  string_memory_ids);
450 
451  // If the hash bucket is not empty, that is our string id
452  // (computeBucketFromStorageAndMemory) already checked to ensure the input string and
453  // bucket string are equal)
454  if (string_id_hash_table_[hash_bucket] != INVALID_STR_ID) {
455  output_string_ids[input_string_idx++] = string_id_hash_table_[hash_bucket];
456  continue;
457  }
458  // Did not find string, so need to add record to dictionary
459  // First check there is room
460  if (shadow_str_count == static_cast<size_t>(max_valid_int_value<T>())) {
461  log_encoding_error<T>(input_string);
462  output_string_ids[input_string_idx++] = inline_int_null_value<T>();
463  continue;
464  }
465  CHECK_LT(shadow_str_count, MAX_STRCOUNT)
466  << "Maximum number (" << shadow_str_count
467  << ") of Dictionary encoded Strings reached for this column, offset path "
468  "for column is "
469  << offsets_path_;
470 
471  string_memory_ids.push_back(input_string_idx);
472  sum_new_string_lengths += input_string.size();
473  string_id_hash_table_[hash_bucket] = static_cast<int32_t>(shadow_str_count);
474  if (materialize_hashes_) {
475  rk_hashes_[shadow_str_count] = input_string_rk_hash;
476  }
477  output_string_ids[input_string_idx++] = shadow_str_count++;
478  }
479  appendToStorageBulk(input_strings, string_memory_ids, sum_new_string_lengths);
480  str_count_ = shadow_str_count;
481 
483 }
void hashStrings(const std::vector< String > &string_vec, std::vector< uint32_t > &hashes) const noexcept
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
void appendToStorageBulk(const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const size_t sum_new_strings_lengths) noexcept
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
void increaseCapacityFromStorageAndMemory(const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids, const std::vector< uint32_t > &input_strings_rk_hashes) noexcept
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void getOrAddBulkRemote(const std::vector< String > &string_vec, T *encoded_vec)
uint32_t computeBucketFromStorageAndMemory(const uint32_t input_string_rk_hash, const String &input_string, const std::vector< int32_t > &string_id_hash_table, const size_t storage_high_water_mark, const std::vector< String > &input_strings, const std::vector< size_t > &string_memory_ids) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T , class String >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< String > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 502 of file StringDictionary.cpp.

References CHECK, and client_no_timeout_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

503  {
505  std::vector<int32_t> string_ids;
506  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
507  size_t out_idx{0};
508  for (size_t i = 0; i < string_ids.size(); ++i) {
509  const auto string_id = string_ids[i];
510  const bool invalid = string_id > max_valid_int_value<T>();
511  if (invalid || string_id == inline_int_null_value<int32_t>()) {
512  if (invalid) {
513  log_encoding_error<T>(string_vec[i]);
514  }
515  encoded_vec[out_idx++] = inline_int_null_value<T>();
516  continue;
517  }
518  encoded_vec[out_idx++] = string_id;
519  }
520 }
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 1047 of file StringDictionary.cpp.

References CHECK, CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

1047  {
1048  // @TODO(wei) treat empty string as NULL for now
1049  if (str.size() == 0) {
1050  return inline_int_null_value<int32_t>();
1051  }
1052  CHECK(str.size() <= MAX_STRLEN);
1053  uint32_t bucket;
1054  const uint32_t hash = rk_hash(str);
1055  {
1056  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
1057  bucket = computeBucket(hash, str, string_id_hash_table_);
1058  if (string_id_hash_table_[bucket] != INVALID_STR_ID) {
1059  return string_id_hash_table_[bucket];
1060  }
1061  }
1062  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1063  // need to recalculate the bucket in case it changed before
1064  // we got the lock
1065  bucket = computeBucket(hash, str, string_id_hash_table_);
1066  if (string_id_hash_table_[bucket] == INVALID_STR_ID) {
1068  << "Maximum number (" << str_count_
1069  << ") of Dictionary encoded Strings reached for this column, offset path "
1070  "for column is "
1071  << offsets_path_;
1072  if (fillRateIsHigh(str_count_)) {
1073  // resize when more than 50% is full
1074  increaseCapacity();
1075  bucket = computeBucket(hash, str, string_id_hash_table_);
1076  }
1077  appendToStorage(str);
1078  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
1079  if (materialize_hashes_) {
1080  rk_hashes_[str_count_] = hash;
1081  }
1082  ++str_count_;
1084  }
1085  return string_id_hash_table_[bucket];
1086 }
uint32_t rk_hash(const std::string_view &str)
bool fillRateIsHigh(const size_t num_strings) const noexcept
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
std::vector< int32_t > string_id_hash_table_
#define CHECK_LT(x, y)
Definition: Logger.h:207
mapd_shared_lock< mapd_shared_mutex > read_lock
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:197
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept
static constexpr size_t MAX_STRLEN
mapd_unique_lock< mapd_shared_mutex > write_lock
void appendToStorage(String str) noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 888 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

890  {
891  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
892  if (client_) {
893  return client_->get_regexp_like(pattern, escape, generation);
894  }
895  const auto cache_key = std::make_pair(pattern, escape);
896  const auto it = regex_cache_.find(cache_key);
897  if (it != regex_cache_.end()) {
898  return it->second;
899  }
900  std::vector<int32_t> result;
901  std::vector<std::thread> workers;
902  int worker_count = cpu_threads();
903  CHECK_GT(worker_count, 0);
904  std::vector<std::vector<int32_t>> worker_results(worker_count);
905  CHECK_LE(generation, str_count_);
906  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
907  workers.emplace_back([&worker_results,
908  &pattern,
909  generation,
910  escape,
911  worker_idx,
912  worker_count,
913  this]() {
914  for (size_t string_id = worker_idx; string_id < generation;
915  string_id += worker_count) {
916  const auto str = getStringUnlocked(string_id);
917  if (is_regexp_like(str, pattern, escape)) {
918  worker_results[worker_idx].push_back(string_id);
919  }
920  }
921  });
922  }
923  for (auto& worker : workers) {
924  worker.join();
925  }
926  for (const auto& worker_result : worker_results) {
927  result.insert(result.end(), worker_result.begin(), worker_result.end());
928  }
929  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
930  CHECK(it_ok.second);
931 
932  return result;
933 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:208
#define CHECK(condition)
Definition: Logger.h:197
mapd_unique_lock< mapd_shared_mutex > write_lock
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 556 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

556  {
557  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
558  if (client_) {
559  std::string ret;
560  client_->get_string(ret, string_id);
561  return ret;
562  }
563  return getStringUnlocked(string_id);
564 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 571 of file StringDictionary.cpp.

References CHECK, CHECK_LE, and CHECK_LT.

572  {
573  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
574  CHECK(!client_);
575  CHECK_LE(0, string_id);
576  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
577  return getStringBytesChecked(string_id);
578 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define CHECK_LE(x, y)
Definition: Logger.h:208
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197
std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 1094 of file StringDictionary.cpp.

References CHECK.

1095  {
1096  const auto str_canary = getStringFromStorage(string_id);
1097  CHECK(!str_canary.canary);
1098  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
1099 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept
std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 1088 of file StringDictionary.cpp.

References CHECK.

Referenced by increaseCapacity().

1088  {
1089  const auto str_canary = getStringFromStorage(string_id);
1090  CHECK(!str_canary.canary);
1091  return std::string(str_canary.c_str_ptr, str_canary.size);
1092 }
#define CHECK(condition)
Definition: Logger.h:197
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 1279 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

1280  {
1281  if (!isTemp_) {
1282  CHECK_GE(payload_fd_, 0);
1283  CHECK_GE(offset_fd_, 0);
1284  }
1285  CHECK_GE(string_id, 0);
1286  const StringIdxEntry* str_meta = offset_map_ + string_id;
1287  if (str_meta->size == 0xffff) {
1288  // hit the canary
1289  return {nullptr, 0, true};
1290  }
1291  return {payload_map_ + str_meta->off, str_meta->size, false};
1292 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:210

+ Here is the caller graph for this function:

std::string_view StringDictionary::getStringFromStorageFast ( const int  string_id) const
privatenoexcept

Definition at line 1273 of file StringDictionary.cpp.

References StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

1274  {
1275  const StringIdxEntry* str_meta = offset_map_ + string_id;
1276  return {payload_map_ + str_meta->off, str_meta->size};
1277 }
StringIdxEntry * offset_map_
std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 566 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

566  {
567  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
568  return getStringChecked(string_id);
569 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:207

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 550 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getIdOfString().

550  {
551  const uint32_t hash = rk_hash(str);
552  auto str_id = string_id_hash_table_[computeBucket(hash, str, string_id_hash_table_)];
553  return str_id;
554 }
uint32_t rk_hash(const std::string_view &str)
std::vector< int32_t > string_id_hash_table_
uint32_t computeBucket(const uint32_t hash, const String &str, const std::vector< int32_t > &data) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::hashStrings ( const std::vector< String > &  string_vec,
std::vector< uint32_t > &  hashes 
) const
privatenoexcept

Method to rk_hash a vector of strings in parallel.

Parameters
string_vecinput vector of strings to be hashed
hashesspace for the output - should be pre-sized to match string_vec size

Definition at line 331 of file StringDictionary.cpp.

References CHECK_EQ, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

332  {
333  CHECK_EQ(string_vec.size(), hashes.size());
334 
335  tbb::parallel_for(tbb::blocked_range<size_t>(0, string_vec.size()),
336  [&string_vec, &hashes](const tbb::blocked_range<size_t>& r) {
337  for (size_t curr_id = r.begin(); curr_id != r.end(); ++curr_id) {
338  if (string_vec[curr_id].empty()) {
339  continue;
340  }
341  hashes[curr_id] = rk_hash(string_vec[curr_id]);
342  }
343  });
344 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
uint32_t rk_hash(const std::string_view &str)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 991 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by getOrAddBulk().

991  {
992  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
993 
994  if (materialize_hashes_) {
995  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
997  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
998  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
999  new_str_ids[bucket] = string_id_hash_table_[i];
1000  }
1001  }
1002  rk_hashes_.resize(rk_hashes_.size() * 2);
1003  } else {
1004  for (size_t i = 0; i < str_count_; ++i) {
1005  const auto str = getStringChecked(i);
1006  const uint32_t hash = rk_hash(str);
1007  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1008  new_str_ids[bucket] = i;
1009  }
1010  }
1011  string_id_hash_table_.swap(new_str_ids);
1012 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class String >
void StringDictionary::increaseCapacityFromStorageAndMemory ( const size_t  storage_high_water_mark,
const std::vector< String > &  input_strings,
const std::vector< size_t > &  string_memory_ids,
const std::vector< uint32_t > &  input_strings_rk_hashes 
)
privatenoexcept

Definition at line 1015 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getOrAddBulkParallel().

1019  {
1020  std::vector<int32_t> new_str_ids(string_id_hash_table_.size() * 2, INVALID_STR_ID);
1021  if (materialize_hashes_) {
1022  for (size_t i = 0; i < string_id_hash_table_.size(); ++i) {
1024  const uint32_t hash = rk_hashes_[string_id_hash_table_[i]];
1025  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1026  new_str_ids[bucket] = string_id_hash_table_[i];
1027  }
1028  }
1029  rk_hashes_.resize(rk_hashes_.size() * 2);
1030  } else {
1031  for (size_t storage_idx = 0; storage_idx != storage_high_water_mark; ++storage_idx) {
1032  const auto storage_string = getStringChecked(storage_idx);
1033  const uint32_t hash = rk_hash(storage_string);
1034  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
1035  new_str_ids[bucket] = storage_idx;
1036  }
1037  for (size_t memory_idx = 0; memory_idx != string_memory_ids.size(); ++memory_idx) {
1038  size_t string_memory_id = string_memory_ids[memory_idx];
1039  uint32_t bucket = computeUniqueBucketWithHash(
1040  input_strings_rk_hashes[string_memory_id], new_str_ids);
1041  new_str_ids[bucket] = storage_high_water_mark + memory_idx;
1042  }
1043  }
1044  string_id_hash_table_.swap(new_str_ids);
1045 }
uint32_t rk_hash(const std::string_view &str)
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1354 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

1354  {
1355  if (!like_cache_.empty()) {
1356  decltype(like_cache_)().swap(like_cache_);
1357  }
1358  if (!regex_cache_.empty()) {
1359  decltype(regex_cache_)().swap(regex_cache_);
1360  }
1361  if (!equal_cache_.empty()) {
1362  decltype(equal_cache_)().swap(equal_cache_);
1363  }
1364  compare_cache_.invalidateInvertedIndex();
1365 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1410 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1410  {
1411  // this method is not thread safe
1412  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1413  size_t t_idx = 0, s_idx = 0, idx = 0;
1414  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1415  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1416  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1417  const auto insert_from_temp_cache =
1418  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1419  if (insert_from_temp_cache) {
1420  updated_cache[idx] = temp_sorted_cache[t_idx++];
1421  } else {
1422  updated_cache[idx] = sorted_cache[s_idx++];
1423  }
1424  }
1425  while (t_idx < temp_sorted_cache.size()) {
1426  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1427  }
1428  while (s_idx < sorted_cache.size()) {
1429  updated_cache[idx++] = sorted_cache[s_idx++];
1430  }
1431  sorted_cache.swap(updated_cache);
1432 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1461 of file StringDictionary.cpp.

References populate_string_ids(), and logger::thread_id().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1465  {
1466  dest_array_ids.resize(source_array_ids.size());
1467 
1468  std::atomic<size_t> row_idx{0};
1469  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1470  int thread_id) {
1471  for (;;) {
1472  auto row = row_idx.fetch_add(1);
1473 
1474  if (row >= dest_array_ids.size()) {
1475  return;
1476  }
1477  const auto& source_ids = source_array_ids[row];
1478  auto& dest_ids = dest_array_ids[row];
1479  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1480  }
1481  };
1482 
1483  const int num_worker_threads = std::thread::hardware_concurrency();
1484 
1485  if (source_array_ids.size() / num_worker_threads > 10) {
1486  std::vector<std::future<void>> worker_threads;
1487  for (int i = 0; i < num_worker_threads; ++i) {
1488  worker_threads.push_back(std::async(std::launch::async, processor, i));
1489  }
1490 
1491  for (auto& child : worker_threads) {
1492  child.wait();
1493  }
1494  for (auto& child : worker_threads) {
1495  child.get();
1496  }
1497  } else {
1498  processor(0);
1499  }
1500 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
ThreadId thread_id()
Definition: Logger.cpp:731

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1434 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1439  {
1440  std::vector<std::string> strings;
1441 
1442  for (const int32_t source_id : source_ids) {
1443  if (source_id == std::numeric_limits<int32_t>::min()) {
1444  strings.emplace_back("");
1445  } else if (source_id < 0) {
1446  if (auto string_itr = transient_mapping.find(source_id);
1447  string_itr != transient_mapping.end()) {
1448  strings.emplace_back(string_itr->second);
1449  } else {
1450  throw std::runtime_error("Unexpected negative source ID");
1451  }
1452  } else {
1453  strings.push_back(source_dict->getString(source_id));
1454  }
1455  }
1456 
1457  dest_ids.resize(strings.size());
1458  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1459 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 211 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and string_id_hash_table_.

Referenced by StringDictionary().

213  {
214  for (auto& dictionary_future : dictionary_futures) {
215  dictionary_future.wait();
216  auto hashVec = dictionary_future.get();
217  for (auto& hash : hashVec) {
218  uint32_t bucket = computeUniqueBucketWithHash(hash.first, string_id_hash_table_);
219  payload_file_off_ += hash.second;
220  string_id_hash_table_[bucket] = static_cast<int32_t>(str_count_);
221  if (materialize_hashes_) {
222  rk_hashes_[str_count_] = hash.first;
223  }
224  ++str_count_;
225  }
226  }
227  dictionary_futures.clear();
228 }
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) noexcept
std::vector< int32_t > string_id_hash_table_
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1397 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1397  {
1398  // This method is not thread-safe.
1399 
1400  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1401  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1402 
1403  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1404  auto a_str = this->getStringFromStorage(a);
1405  auto b_str = this->getStringFromStorage(b);
1406  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1407  });
1408 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 580 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

580  {
581  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
582  if (client_) {
583  return client_->storage_entry_count();
584  }
585  return str_count_;
586 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
mapd_shared_lock< mapd_shared_mutex > read_lock

Member Data Documentation

char* StringDictionary::CANARY_BUFFER {nullptr}
private

Definition at line 226 of file StringDictionary.h.

Referenced by ~StringDictionary().

size_t StringDictionary::canary_buffer_size = 0
private

Definition at line 227 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 224 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and getOrAddBulkRemote().

size_t StringDictionary::collisions_
private

Definition at line 202 of file StringDictionary.h.

Referenced by StringDictionary().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 221 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 220 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 218 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 115 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkParallel().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 208 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddBulkParallel(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private
size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 219 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<uint32_t> StringDictionary::rk_hashes_
private
std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 205 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::string_id_hash_table_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 222 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: