OmniSciDB  04ee39c94c
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T >
void getOrAddBulk (const std::vector< std::string > &string_vec, T *encoded_vec)
 
void getOrAddBulkArray (const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const std::vector< std::string > > copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict)
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
bool fillRateIsHigh () const noexcept
 
void increaseCapacity () noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class T >
void getOrAddBulkRemote (const std::vector< std::string > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
uint32_t computeBucket (const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) const noexcept
 
void appendToStorage (const std::string &str) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
void addPayloadCapacity () noexcept
 
void addOffsetCapacity () noexcept
 
size_t addStorageCapacity (int fd) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
std::vector< int32_t > str_ids_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
 
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string, compare_cache_value_tcompare_cache_
 
std::shared_ptr< std::vector< std::string > > strings_cache_
 
std::unique_ptr< StringDictionaryClientclient_
 
std::unique_ptr< StringDictionaryClientclient_no_timeout_
 

Static Private Attributes

static char * CANARY_BUFFER {nullptr}
 

Detailed Description

Definition at line 46 of file StringDictionary.h.

Constructor & Destructor Documentation

◆ StringDictionary() [1/2]

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 103 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), anonymous_namespace{StringDictionary.cpp}::file_size(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_ids_, and logger::WARNING.

108  : str_count_(0)
109  , str_ids_(initial_capacity, INVALID_STR_ID)
110  , rk_hashes_(initial_capacity)
111  , isTemp_(isTemp)
112  , materialize_hashes_(materializeHashes)
113  , payload_fd_(-1)
114  , offset_fd_(-1)
115  , offset_map_(nullptr)
116  , payload_map_(nullptr)
117  , offset_file_size_(0)
118  , payload_file_size_(0)
119  , payload_file_off_(0)
120  , strings_cache_(nullptr) {
121  if (!isTemp && folder.empty()) {
122  return;
123  }
124 
125  // initial capacity must be a power of two for efficient bucket computation
126  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
127  if (!isTemp_) {
128  boost::filesystem::path storage_path(folder);
129  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
130  const auto payload_path =
131  (storage_path / boost::filesystem::path("DictPayload")).string();
132  payload_fd_ = checked_open(payload_path.c_str(), recover);
133  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
135  offset_file_size_ = file_size(offset_fd_);
136  }
137 
138  if (payload_file_size_ == 0) {
140  }
141  if (offset_file_size_ == 0) {
143  }
144  if (!isTemp_) { // we never mmap or recover temp dictionaries
145  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
146  offset_map_ =
147  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
148  if (recover) {
149  const size_t bytes = file_size(offset_fd_);
150  if (bytes % sizeof(StringIdxEntry) != 0) {
151  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
152  }
153  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
154  // at this point we know the size of the StringDict we need to load
155  // so lets reallocate the vector to the correct size
156  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
157  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
158  str_ids_.swap(new_str_ids);
159  if (materialize_hashes_) {
160  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
161  rk_hashes_.swap(new_rk_hashes);
162  }
163  unsigned string_id = 0;
164  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
165 
166  uint32_t thread_inits = 0;
167  const auto thread_count = std::thread::hardware_concurrency();
168  const uint32_t items_per_thread = std::max<uint32_t>(
169  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
170  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
171  dictionary_futures;
172  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
173  dictionary_futures.emplace_back(std::async(
174  std::launch::async, [string_id, str_count, items_per_thread, this] {
175  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
176  for (uint32_t curr_id = string_id;
177  curr_id < string_id + items_per_thread && curr_id < str_count;
178  curr_id++) {
179  const auto recovered = getStringFromStorage(curr_id);
180  if (recovered.canary) {
181  // hit the canary, recovery finished
182  break;
183  } else {
184  std::string temp(recovered.c_str_ptr, recovered.size);
185  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
186  }
187  }
188  return hashVec;
189  }));
190  thread_inits++;
191  if (thread_inits % thread_count == 0) {
192  processDictionaryFutures(dictionary_futures);
193  }
194  }
195  // gather last few threads
196  if (dictionary_futures.size() != 0) {
197  processDictionaryFutures(dictionary_futures);
198  }
199  }
200  }
201 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:195
#define LOG(tag)
Definition: Logger.h:182
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
void addPayloadCapacity() noexcept
mapd_shared_mutex rw_mutex_
int checked_open(const char *path, const bool recover)
void addOffsetCapacity() noexcept
uint32_t rk_hash(const std::string &str)
void * checked_mmap(const int fd, const size_t sz)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > str_ids_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:

◆ StringDictionary() [2/2]

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 222 of file StringDictionary.cpp.

223  : strings_cache_(nullptr)
224  , client_(new StringDictionaryClient(host, dict_ref, true))
225  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_

◆ ~StringDictionary()

StringDictionary::~StringDictionary ( )
noexcept

Definition at line 227 of file StringDictionary.cpp.

References CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

227  {
228  if (client_) {
229  return;
230  }
231  if (payload_map_) {
232  if (!isTemp_) {
236  CHECK_GE(payload_fd_, 0);
238  CHECK_GE(offset_fd_, 0);
239  close(offset_fd_);
240  } else {
242  free(payload_map_);
243  free(offset_map_);
244  }
245  }
246 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:200
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:187
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:102
+ Here is the call graph for this function:

Member Function Documentation

◆ addMemoryCapacity()

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size 
)
privatenoexcept

Definition at line 1036 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1036  {
1037  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1038  if (!CANARY_BUFFER) {
1039  CANARY_BUFFER = reinterpret_cast<char*>(malloc(CANARY_BUFF_SIZE));
1041  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1042  }
1043  void* new_addr = realloc(addr, mem_size + CANARY_BUFF_SIZE);
1044  CHECK(new_addr);
1045  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1046  CHECK(memcpy(write_addr, CANARY_BUFFER, CANARY_BUFF_SIZE));
1047  mem_size += CANARY_BUFF_SIZE;
1048  return new_addr;
1049 }
static char * CANARY_BUFFER
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ addOffsetCapacity()

void StringDictionary::addOffsetCapacity ( )
privatenoexcept

Definition at line 1015 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, offset_fd_, offset_file_size_, and offset_map_.

Referenced by appendToStorage(), and StringDictionary().

1015  {
1016  if (!isTemp_) {
1018  } else {
1019  offset_map_ =
1020  static_cast<StringIdxEntry*>(addMemoryCapacity(offset_map_, offset_file_size_));
1021  }
1022 }
StringIdxEntry * offset_map_
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addPayloadCapacity()

void StringDictionary::addPayloadCapacity ( )
privatenoexcept

Definition at line 1006 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by appendToStorage(), and StringDictionary().

1006  {
1007  if (!isTemp_) {
1009  } else {
1010  payload_map_ =
1011  static_cast<char*>(addMemoryCapacity(payload_map_, payload_file_size_));
1012  }
1013 }
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addStorageCapacity()

size_t StringDictionary::addStorageCapacity ( int  fd)
privatenoexcept

Definition at line 1024 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1024  {
1025  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1026  if (!CANARY_BUFFER) {
1027  CANARY_BUFFER = static_cast<char*>(malloc(CANARY_BUFF_SIZE));
1029  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1030  }
1031  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1032  CHECK(write(fd, CANARY_BUFFER, CANARY_BUFF_SIZE) == CANARY_BUFF_SIZE);
1033  return CANARY_BUFF_SIZE;
1034 }
#define CHECK_NE(x, y)
Definition: Logger.h:196
static char * CANARY_BUFFER
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:121
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ appendToStorage()

void StringDictionary::appendToStorage ( const std::string &  str)
privatenoexcept

Definition at line 955 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_off_, payload_file_size_, payload_map_, StringDictionary::StringIdxEntry::size, and str_count_.

Referenced by getOrAddBulk(), and getOrAddImpl().

955  {
956  if (!isTemp_) {
957  CHECK_GE(payload_fd_, 0);
958  CHECK_GE(offset_fd_, 0);
959  }
960  // write the payload
961  if (payload_file_off_ + str.size() > payload_file_size_) {
962  if (!isTemp_) {
965  CHECK(payload_file_off_ + str.size() <= payload_file_size_);
966  payload_map_ =
967  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
968  } else {
970  }
971  }
972  memcpy(payload_map_ + payload_file_off_, str.c_str(), str.size());
973  // write the offset and length
974  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
975  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
976  payload_file_off_ += str.size();
977  if (offset_file_off + sizeof(str_meta) >= offset_file_size_) {
978  if (!isTemp_) {
981  CHECK(offset_file_off + sizeof(str_meta) <= offset_file_size_);
982  offset_map_ =
983  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
984  } else {
986  }
987  }
988  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
989 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:200
void addPayloadCapacity() noexcept
void addOffsetCapacity() noexcept
void * checked_mmap(const int fd, const size_t sz)
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ binary_search_cache()

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private

◆ buildSortedCache()

void StringDictionary::buildSortedCache ( )
private

Definition at line 1083 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1083  {
1084  // This method is not thread-safe.
1085  const auto cur_cache_size = sorted_cache.size();
1086  std::vector<int32_t> temp_sorted_cache;
1087  for (size_t i = cur_cache_size; i < str_count_; i++) {
1088  temp_sorted_cache.push_back(i);
1089  }
1090  sortCache(temp_sorted_cache);
1091  mergeSortedCache(temp_sorted_cache);
1092 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ checkpoint()

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1066 of file StringDictionary.cpp.

References CHECK, client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

1066  {
1067  if (client_) {
1068  try {
1069  return client_->checkpoint();
1070  } catch (...) {
1071  return false;
1072  }
1073  }
1074  CHECK(!isTemp_);
1075  bool ret = true;
1076  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1077  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1078  ret = ret && (fsync(offset_fd_) == 0);
1079  ret = ret && (fsync(payload_fd_) == 0);
1080  return ret;
1081 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:187

◆ computeBucket()

uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const std::string  str,
const std::vector< int32_t > &  data,
const bool  unique 
) const
privatenoexcept

Definition at line 898 of file StringDictionary.cpp.

References getStringFromStorage(), INVALID_STR_ID, materialize_hashes_, and rk_hashes_.

Referenced by getOrAddBulk(), getOrAddImpl(), and getUnlocked().

901  {
902  auto bucket = hash & (data.size() - 1);
903  while (true) {
904  if (data[bucket] ==
905  INVALID_STR_ID) { // In this case it means the slot is available for use
906  break;
907  }
908  // if records are unique I don't need to do this test as I know it will not be the
909  // same
910  if (!unique) {
911  if (materialize_hashes_) {
912  if (hash == rk_hashes_[data[bucket]]) {
913  // can't be the same string if hash is different
914  const auto old_str = getStringFromStorage(data[bucket]);
915  if (str.size() == old_str.size &&
916  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
917  // found the string
918  break;
919  }
920  }
921  } else {
922  const auto old_str = getStringFromStorage(data[bucket]);
923  if (str.size() == old_str.size &&
924  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
925  // found the string
926  break;
927  }
928  }
929  }
930  // wrap around
931  if (++bucket == data.size()) {
932  bucket = 0;
933  }
934  }
935  return bucket;
936 }
static constexpr int32_t INVALID_STR_ID
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ computeUniqueBucketWithHash()

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 938 of file StringDictionary.cpp.

References INVALID_STR_ID.

Referenced by increaseCapacity(), and processDictionaryFutures().

940  {
941  auto bucket = hash & (data.size() - 1);
942  while (true) {
943  if (data[bucket] ==
944  INVALID_STR_ID) { // In this case it means the slot is available for use
945  break;
946  }
947  // wrap around
948  if (++bucket == data.size()) {
949  bucket = 0;
950  }
951  }
952  return bucket;
953 }
static constexpr int32_t INVALID_STR_ID
+ Here is the caller graph for this function:

◆ copyStrings()

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 765 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

765  {
766  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
767  if (client_) {
768  // TODO(miyu): support remote string dictionary
769  throw std::runtime_error(
770  "copying dictionaries from remote server is not supported yet.");
771  }
772 
773  if (strings_cache_) {
774  return strings_cache_;
775  }
776 
777  strings_cache_ = std::make_shared<std::vector<std::string>>();
778  strings_cache_->reserve(str_count_);
779  const bool multithreaded = str_count_ > 10000;
780  const auto worker_count =
781  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
782  CHECK_GT(worker_count, 0UL);
783  std::vector<std::vector<std::string>> worker_results(worker_count);
784  auto copy = [this](std::vector<std::string>& str_list,
785  const size_t start_id,
786  const size_t end_id) {
787  CHECK_LE(start_id, end_id);
788  str_list.reserve(end_id - start_id);
789  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
790  str_list.push_back(getStringUnlocked(string_id));
791  }
792  };
793  if (multithreaded) {
794  std::vector<std::future<void>> workers;
795  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
796  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
797  worker_idx < worker_count && start < str_count_;
798  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
799  workers.push_back(std::async(
800  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
801  }
802  for (auto& worker : workers) {
803  worker.get();
804  }
805  } else {
806  CHECK_EQ(worker_results.size(), size_t(1));
807  copy(worker_results[0], 0, str_count_);
808  }
809 
810  for (const auto& worker_result : worker_results) {
811  strings_cache_->insert(
812  strings_cache_->end(), worker_result.begin(), worker_result.end());
813  }
814  return strings_cache_;
815 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:198
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:

◆ fillRateIsHigh()

bool StringDictionary::fillRateIsHigh ( ) const
privatenoexcept

Definition at line 817 of file StringDictionary.cpp.

References str_count_, and str_ids_.

Referenced by getOrAddBulk(), and getOrAddImpl().

817  {
818  return str_ids_.size() < str_count_ * 2;
819 }
std::vector< int32_t > str_ids_
+ Here is the caller graph for this function:

◆ getCompare()

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 557 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

559  {
560  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
561  if (client_) {
562  return client_->get_compare(pattern, comp_operator, generation);
563  }
564  std::vector<int32_t> ret;
565  if (str_count_ == 0) {
566  return ret;
567  }
568  if (sorted_cache.size() < str_count_) {
569  if (comp_operator == "=" || comp_operator == "<>") {
570  return getEquals(pattern, comp_operator, generation);
571  }
572 
574  }
575  auto cache_index = compare_cache_.get(pattern);
576 
577  if (!cache_index) {
578  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
579  const auto cache_itr = std::lower_bound(
580  sorted_cache.begin(),
581  sorted_cache.end(),
582  pattern,
583  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
584  auto a_str = this->getStringFromStorage(a);
585  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
586  });
587 
588  if (cache_itr == sorted_cache.end()) {
589  cache_index->index = sorted_cache.size() - 1;
590  cache_index->diff = 1;
591  } else {
592  const auto cache_str = getStringFromStorage(*cache_itr);
593  if (!string_eq(
594  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
595  cache_index->index = cache_itr - sorted_cache.begin() - 1;
596  cache_index->diff = 1;
597  } else {
598  cache_index->index = cache_itr - sorted_cache.begin();
599  cache_index->diff = 0;
600  }
601  }
602 
603  compare_cache_.put(pattern, cache_index);
604  }
605 
606  // since we have a cache in form of vector of ints which is sorted according to
607  // corresponding strings in the dictionary all we need is the index of the element
608  // which equal to the pattern that we are trying to match or the index of “biggest”
609  // element smaller than the pattern, to perform all the comparison operators over
610  // string. The search function guarantees we have such index so now it is just the
611  // matter to include all the elements in the result vector.
612 
613  // For < operator if the index that we have points to the element which is equal to
614  // the pattern that we are searching for we simply get all the elements less than the
615  // index. If the element pointed by the index is not equal to the pattern we are
616  // comparing with we also need to include that index in result vector, except when the
617  // index points to 0 and the pattern is lesser than the smallest value in the string
618  // dictionary.
619 
620  if (comp_operator == "<") {
621  size_t idx = cache_index->index;
622  if (cache_index->diff) {
623  idx = cache_index->index + 1;
624  if (cache_index->index == 0 && cache_index->diff > 0) {
625  idx = cache_index->index;
626  }
627  }
628  for (size_t i = 0; i < idx; i++) {
629  ret.push_back(sorted_cache[i]);
630  }
631 
632  // For <= operator if the index that we have points to the element which is equal to
633  // the pattern that we are searching for we want to include the element pointed by
634  // the index in the result set. If the element pointed by the index is not equal to
635  // the pattern we are comparing with we just want to include all the ids with index
636  // less than the index that is cached, except when pattern that we are searching for
637  // is smaller than the smallest string in the dictionary.
638 
639  } else if (comp_operator == "<=") {
640  size_t idx = cache_index->index + 1;
641  if (cache_index == 0 && cache_index->diff > 0) {
642  idx = cache_index->index;
643  }
644  for (size_t i = 0; i < idx; i++) {
645  ret.push_back(sorted_cache[i]);
646  }
647 
648  // For > operator we want to get all the elements with index greater than the index
649  // that we have except, when the pattern we are searching for is lesser than the
650  // smallest string in the dictionary we also want to include the id of the index
651  // that we have.
652 
653  } else if (comp_operator == ">") {
654  size_t idx = cache_index->index + 1;
655  if (cache_index->index == 0 && cache_index->diff > 0) {
656  idx = cache_index->index;
657  }
658  for (size_t i = idx; i < sorted_cache.size(); i++) {
659  ret.push_back(sorted_cache[i]);
660  }
661 
662  // For >= operator when the indexed element that we have points to element which is
663  // equal to the pattern we are searching for we want to include that in the result
664  // vector. If the index that we have does not point to the string which is equal to
665  // the pattern we are searching we don’t want to include that id into the result
666  // vector except when the index is 0.
667 
668  } else if (comp_operator == ">=") {
669  size_t idx = cache_index->index;
670  if (cache_index->diff) {
671  idx = cache_index->index + 1;
672  if (cache_index->index == 0 && cache_index->diff > 0) {
673  idx = cache_index->index;
674  }
675  }
676  for (size_t i = idx; i < sorted_cache.size(); i++) {
677  ret.push_back(sorted_cache[i]);
678  }
679  } else if (comp_operator == "=") {
680  if (!cache_index->diff) {
681  ret.push_back(sorted_cache[cache_index->index]);
682  }
683 
684  // For <> operator it is simple matter of not including id of string which is equal
685  // to pattern we are searching for.
686  } else if (comp_operator == "<>") {
687  if (!cache_index->diff) {
688  size_t idx = cache_index->index;
689  for (size_t i = 0; i < idx; i++) {
690  ret.push_back(sorted_cache[i]);
691  }
692  ++idx;
693  for (size_t i = idx; i < sorted_cache.size(); i++) {
694  ret.push_back(sorted_cache[i]);
695  }
696  } else {
697  for (size_t i = 0; i < sorted_cache.size(); i++) {
698  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
699  }
700  }
701 
702  } else {
703  std::runtime_error("Unsupported string comparison operator");
704  }
705  return ret;
706 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:

◆ getEquals()

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 497 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run-benchmark-import::result, and str_count_.

Referenced by getCompare().

499  {
500  std::vector<int32_t> result;
501  auto eq_id_itr = equal_cache_.find(pattern);
502  int32_t eq_id = MAX_STRLEN + 1;
503  int32_t cur_size = str_count_;
504  if (eq_id_itr != equal_cache_.end()) {
505  auto eq_id = eq_id_itr->second;
506  if (comp_operator == "=") {
507  result.push_back(eq_id);
508  } else {
509  for (int32_t idx = 0; idx <= cur_size; idx++) {
510  if (idx == eq_id) {
511  continue;
512  }
513  result.push_back(idx);
514  }
515  }
516  } else {
517  std::vector<std::thread> workers;
518  int worker_count = cpu_threads();
519  CHECK_GT(worker_count, 0);
520  std::vector<std::vector<int32_t>> worker_results(worker_count);
521  CHECK_LE(generation, str_count_);
522  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
523  workers.emplace_back(
524  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
525  for (size_t string_id = worker_idx; string_id < generation;
526  string_id += worker_count) {
527  const auto str = getStringUnlocked(string_id);
528  if (str == pattern) {
529  worker_results[worker_idx].push_back(string_id);
530  }
531  }
532  });
533  }
534  for (auto& worker : workers) {
535  worker.join();
536  }
537  for (const auto& worker_result : worker_results) {
538  result.insert(result.end(), worker_result.begin(), worker_result.end());
539  }
540  if (result.size() > 0) {
541  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
542  CHECK(it_ok.second);
543  eq_id = result[0];
544  }
545  if (comp_operator == "<>") {
546  for (int32_t idx = 0; idx <= cur_size; idx++) {
547  if (idx == eq_id) {
548  continue;
549  }
550  result.push_back(idx);
551  }
552  }
553  }
554  return result;
555 }
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:198
#define CHECK(condition)
Definition: Logger.h:187
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getIdOfString()

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 372 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

Referenced by TEST().

372  {
373  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
374  if (client_) {
375  return client_->get(str);
376  }
377  return getUnlocked(str);
378 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getLike()

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 444 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run-benchmark-import::result, rw_mutex_, and str_count_.

448  {
449  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
450  if (client_) {
451  return client_->get_like(pattern, icase, is_simple, escape, generation);
452  }
453  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
454  const auto it = like_cache_.find(cache_key);
455  if (it != like_cache_.end()) {
456  return it->second;
457  }
458  std::vector<int32_t> result;
459  std::vector<std::thread> workers;
460  int worker_count = cpu_threads();
461  CHECK_GT(worker_count, 0);
462  std::vector<std::vector<int32_t>> worker_results(worker_count);
463  CHECK_LE(generation, str_count_);
464  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
465  workers.emplace_back([&worker_results,
466  &pattern,
467  generation,
468  icase,
469  is_simple,
470  escape,
471  worker_idx,
472  worker_count,
473  this]() {
474  for (size_t string_id = worker_idx; string_id < generation;
475  string_id += worker_count) {
476  const auto str = getStringUnlocked(string_id);
477  if (is_like(str, pattern, icase, is_simple, escape)) {
478  worker_results[worker_idx].push_back(string_id);
479  }
480  }
481  });
482  }
483  for (auto& worker : workers) {
484  worker.join();
485  }
486  for (const auto& worker_result : worker_results) {
487  result.insert(result.end(), worker_result.begin(), worker_result.end());
488  }
489  // place result into cache for reuse if similar query
490  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
491 
492  CHECK(it_ok.second);
493 
494  return result;
495 }
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:198
#define CHECK(condition)
Definition: Logger.h:187
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:

◆ getOrAdd()

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 248 of file StringDictionary.cpp.

References CHECK_EQ, client_, and getOrAddImpl().

Referenced by TEST().

248  {
249  if (client_) {
250  std::vector<int32_t> string_ids;
251  client_->get_or_add_bulk(string_ids, {str});
252  CHECK_EQ(size_t(1), string_ids.size());
253  return string_ids.front();
254  }
255  return getOrAddImpl(str);
256 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAddBulk()

template<class T >
template void StringDictionary::getOrAddBulk ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)

Definition at line 282 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and str_ids_.

Referenced by getOrAddBulkArray(), and populate_string_ids().

283  {
284  if (client_no_timeout_) {
285  getOrAddBulkRemote(string_vec, encoded_vec);
286  return;
287  }
288  size_t out_idx{0};
289  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
290 
291  for (const auto& str : string_vec) {
292  if (str.empty()) {
293  encoded_vec[out_idx++] = inline_int_null_value<T>();
294  continue;
295  }
296  CHECK(str.size() <= MAX_STRLEN);
297  uint32_t bucket;
298  const uint32_t hash = rk_hash(str);
299  bucket = computeBucket(hash, str, str_ids_, false);
300  if (str_ids_[bucket] != INVALID_STR_ID) {
301  encoded_vec[out_idx++] = str_ids_[bucket];
302  continue;
303  }
304  // need to add record to dictionary
305  // check there is room
306  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
307  log_encoding_error<T>(str);
308  encoded_vec[out_idx++] = inline_int_null_value<T>();
309  continue;
310  }
311  if (str_ids_[bucket] == INVALID_STR_ID) {
313  << "Maximum number (" << str_count_
314  << ") of Dictionary encoded Strings reached for this column, offset path "
315  "for column is "
316  << offsets_path_;
317  if (fillRateIsHigh()) {
318  // resize when more than 50% is full
320  bucket = computeBucket(hash, str, str_ids_, false);
321  }
322  appendToStorage(str);
323 
324  str_ids_[bucket] = static_cast<int32_t>(str_count_);
325  if (materialize_hashes_) {
326  rk_hashes_[str_count_] = hash;
327  }
328  ++str_count_;
329  }
330  encoded_vec[out_idx++] = str_ids_[bucket];
331  }
333 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:197
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void getOrAddBulkRemote(const std::vector< std::string > &string_vec, T *encoded_vec)
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAddBulkArray()

void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< std::string >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 269 of file StringDictionary.cpp.

References getOrAddBulk().

271  {
272  ids_array_vec.resize(string_array_vec.size());
273  for (size_t i = 0; i < string_array_vec.size(); i++) {
274  auto& strings = string_array_vec[i];
275  auto& ids = ids_array_vec[i];
276  ids.resize(strings.size());
277  getOrAddBulk(strings, &ids[0]);
278  }
279 }
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:

◆ getOrAddBulkRemote()

template<class T >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 342 of file StringDictionary.cpp.

References CHECK, and client_no_timeout_.

Referenced by getOrAddBulk().

343  {
345  std::vector<int32_t> string_ids;
346  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
347  size_t out_idx{0};
348  for (size_t i = 0; i < string_ids.size(); ++i) {
349  const auto string_id = string_ids[i];
350  const bool invalid = string_id > max_valid_int_value<T>();
351  if (invalid || string_id == inline_int_null_value<int32_t>()) {
352  if (invalid) {
353  log_encoding_error<T>(string_vec[i]);
354  }
355  encoded_vec[out_idx++] = inline_int_null_value<T>();
356  continue;
357  }
358  encoded_vec[out_idx++] = string_id;
359  }
360 }
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the caller graph for this function:

◆ getOrAddImpl()

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 844 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), fillRateIsHigh(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and str_ids_.

Referenced by getOrAdd().

844  {
845  // @TODO(wei) treat empty string as NULL for now
846  if (str.size() == 0) {
847  return inline_int_null_value<int32_t>();
848  }
849  CHECK(str.size() <= MAX_STRLEN);
850  uint32_t bucket;
851  const uint32_t hash = rk_hash(str);
852  {
853  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
854  bucket = computeBucket(hash, str, str_ids_, false);
855  if (str_ids_[bucket] != INVALID_STR_ID) {
856  return str_ids_[bucket];
857  }
858  }
859  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
860  // need to recalculate the bucket in case it changed before
861  // we got the lock
862  bucket = computeBucket(hash, str, str_ids_, false);
863  if (str_ids_[bucket] == INVALID_STR_ID) {
865  << "Maximum number (" << str_count_
866  << ") of Dictionary encoded Strings reached for this column, offset path "
867  "for column is "
868  << offsets_path_;
869  if (fillRateIsHigh()) {
870  // resize when more than 50% is full
872  bucket = computeBucket(hash, str, str_ids_, false);
873  }
874  appendToStorage(str);
875  str_ids_[bucket] = static_cast<int32_t>(str_count_);
876  if (materialize_hashes_) {
877  rk_hashes_[str_count_] = hash;
878  }
879  ++str_count_;
881  }
882  return str_ids_[bucket];
883 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:197
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:187
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getRegexpLike()

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 718 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run-benchmark-import::result, rw_mutex_, and str_count_.

720  {
721  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
722  if (client_) {
723  return client_->get_regexp_like(pattern, escape, generation);
724  }
725  const auto cache_key = std::make_pair(pattern, escape);
726  const auto it = regex_cache_.find(cache_key);
727  if (it != regex_cache_.end()) {
728  return it->second;
729  }
730  std::vector<int32_t> result;
731  std::vector<std::thread> workers;
732  int worker_count = cpu_threads();
733  CHECK_GT(worker_count, 0);
734  std::vector<std::vector<int32_t>> worker_results(worker_count);
735  CHECK_LE(generation, str_count_);
736  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
737  workers.emplace_back([&worker_results,
738  &pattern,
739  generation,
740  escape,
741  worker_idx,
742  worker_count,
743  this]() {
744  for (size_t string_id = worker_idx; string_id < generation;
745  string_id += worker_count) {
746  const auto str = getStringUnlocked(string_id);
747  if (is_regexp_like(str, pattern, escape)) {
748  worker_results[worker_idx].push_back(string_id);
749  }
750  }
751  });
752  }
753  for (auto& worker : workers) {
754  worker.join();
755  }
756  for (const auto& worker_result : worker_results) {
757  result.insert(result.end(), worker_result.begin(), worker_result.end());
758  }
759  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
760  CHECK(it_ok.second);
761 
762  return result;
763 }
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:198
#define CHECK(condition)
Definition: Logger.h:187
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:

◆ getString()

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 386 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), populate_string_ids(), and TEST().

386  {
387  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
388  if (client_) {
389  std::string ret;
390  client_->get_string(ret, string_id);
391  return ret;
392  }
393  return getStringUnlocked(string_id);
394 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringBytes()

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 401 of file StringDictionary.cpp.

References CHECK, CHECK_LE, CHECK_LT, client_, getStringBytesChecked(), rw_mutex_, and str_count_.

402  {
403  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
404  CHECK(!client_);
405  CHECK_LE(0, string_id);
406  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
407  return getStringBytesChecked(string_id);
408 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:197
#define CHECK_LE(x, y)
Definition: Logger.h:198
#define CHECK(condition)
Definition: Logger.h:187
+ Here is the call graph for this function:

◆ getStringBytesChecked()

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 891 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringBytes().

892  {
893  const auto str_canary = getStringFromStorage(string_id);
894  CHECK(!str_canary.canary);
895  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
896 }
#define CHECK(condition)
Definition: Logger.h:187
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringChecked()

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 885 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringUnlocked(), and increaseCapacity().

885  {
886  const auto str_canary = getStringFromStorage(string_id);
887  CHECK(!str_canary.canary);
888  return std::string(str_canary.c_str_ptr, str_canary.size);
889 }
#define CHECK(condition)
Definition: Logger.h:187
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringFromStorage()

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 991 of file StringDictionary.cpp.

References CHECK_GE, isTemp_, StringDictionary::StringIdxEntry::off, offset_fd_, offset_map_, payload_fd_, payload_map_, and StringDictionary::StringIdxEntry::size.

Referenced by computeBucket(), getCompare(), getStringBytesChecked(), getStringChecked(), mergeSortedCache(), sortCache(), and StringDictionary().

992  {
993  if (!isTemp_) {
994  CHECK_GE(payload_fd_, 0);
995  CHECK_GE(offset_fd_, 0);
996  }
997  CHECK_GE(string_id, 0);
998  const StringIdxEntry* str_meta = offset_map_ + string_id;
999  if (str_meta->size == 0xffff) {
1000  // hit the canary
1001  return {nullptr, 0, true};
1002  }
1003  return {payload_map_ + str_meta->off, str_meta->size, false};
1004 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:200
+ Here is the caller graph for this function:

◆ getStringUnlocked()

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 396 of file StringDictionary.cpp.

References CHECK_LT, getStringChecked(), and str_count_.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

396  {
397  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
398  return getStringChecked(string_id);
399 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getUnlocked()

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 380 of file StringDictionary.cpp.

References computeBucket(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), and str_ids_.

Referenced by getIdOfString().

380  {
381  const uint32_t hash = rk_hash(str);
382  auto str_id = str_ids_[computeBucket(hash, str, str_ids_, false)];
383  return str_id;
384 }
uint32_t rk_hash(const std::string &str)
std::vector< int32_t > str_ids_
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ increaseCapacity()

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 821 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and str_ids_.

Referenced by getOrAddBulk(), and getOrAddImpl().

821  {
822  std::vector<int32_t> new_str_ids(str_ids_.size() * 2, INVALID_STR_ID);
823 
824  if (materialize_hashes_) {
825  for (size_t i = 0; i < str_ids_.size(); ++i) {
826  if (str_ids_[i] != INVALID_STR_ID) {
827  const uint32_t hash = rk_hashes_[str_ids_[i]];
828  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
829  new_str_ids[bucket] = str_ids_[i];
830  }
831  }
832  rk_hashes_.resize(rk_hashes_.size() * 2);
833  } else {
834  for (size_t i = 0; i < str_count_; ++i) {
835  const auto str = getStringChecked(i);
836  const uint32_t hash = rk_hash(str);
837  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
838  new_str_ids[bucket] = i;
839  }
840  }
841  str_ids_.swap(new_str_ids);
842 }
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t rk_hash(const std::string &str)
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ insertInSortedCache()

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private

◆ invalidateInvertedIndex()

void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1051 of file StringDictionary.cpp.

References CANARY_BUFFER, compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), and getOrAddImpl().

1051  {
1052  if (!like_cache_.empty()) {
1053  decltype(like_cache_)().swap(like_cache_);
1054  }
1055  if (!regex_cache_.empty()) {
1056  decltype(regex_cache_)().swap(regex_cache_);
1057  }
1058  if (!equal_cache_.empty()) {
1059  decltype(equal_cache_)().swap(equal_cache_);
1060  }
1061  compare_cache_.invalidateInvertedIndex();
1062 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
+ Here is the caller graph for this function:

◆ mergeSortedCache()

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1107 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1107  {
1108  // this method is not thread safe
1109  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1110  size_t t_idx = 0, s_idx = 0, idx = 0;
1111  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1112  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1113  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1114  const auto insert_from_temp_cache =
1115  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1116  if (insert_from_temp_cache) {
1117  updated_cache[idx] = temp_sorted_cache[t_idx++];
1118  } else {
1119  updated_cache[idx] = sorted_cache[s_idx++];
1120  }
1121  }
1122  while (t_idx < temp_sorted_cache.size()) {
1123  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1124  }
1125  while (s_idx < sorted_cache.size()) {
1126  updated_cache[idx++] = sorted_cache[s_idx++];
1127  }
1128  sorted_cache.swap(updated_cache);
1129 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_array_ids()

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1151 of file StringDictionary.cpp.

References populate_string_ids().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1155  {
1156  dest_array_ids.resize(source_array_ids.size());
1157 
1158  std::atomic<size_t> row_idx{0};
1159  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1160  int thread_id) {
1161  for (;;) {
1162  auto row = row_idx.fetch_add(1);
1163 
1164  if (row >= dest_array_ids.size()) {
1165  return;
1166  }
1167  const auto& source_ids = source_array_ids[row];
1168  auto& dest_ids = dest_array_ids[row];
1169  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1170  }
1171  };
1172 
1173  const int num_worker_threads = std::thread::hardware_concurrency();
1174 
1175  if (source_array_ids.size() / num_worker_threads > 10) {
1176  std::vector<std::future<void>> worker_threads;
1177  for (int i = 0; i < num_worker_threads; ++i) {
1178  worker_threads.push_back(std::async(std::launch::async, processor, i));
1179  }
1180 
1181  for (auto& child : worker_threads) {
1182  child.wait();
1183  }
1184  for (auto& child : worker_threads) {
1185  child.get();
1186  }
1187  } else {
1188  processor(0);
1189  }
1190 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_ids()

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict 
)
static

Definition at line 1131 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1134  {
1135  std::vector<std::string> strings;
1136 
1137  for (const int32_t source_id : source_ids) {
1138  if (source_id == std::numeric_limits<int32_t>::min()) {
1139  strings.emplace_back("");
1140  } else if (source_id < 0) {
1141  throw std::runtime_error("Unexpected negative source ID");
1142  } else {
1143  strings.push_back(source_dict->getString(source_id));
1144  }
1145  }
1146 
1147  dest_ids.resize(strings.size());
1148  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1149 }
std::string getString(int32_t string_id) const
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ processDictionaryFutures()

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 203 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and str_ids_.

Referenced by StringDictionary().

205  {
206  for (auto& dictionary_future : dictionary_futures) {
207  dictionary_future.wait();
208  auto hashVec = dictionary_future.get();
209  for (auto& hash : hashVec) {
210  uint32_t bucket = computeUniqueBucketWithHash(hash.first, str_ids_);
211  payload_file_off_ += hash.second;
212  str_ids_[bucket] = static_cast<int32_t>(str_count_);
213  if (materialize_hashes_) {
214  rk_hashes_[str_count_] = hash.first;
215  }
216  ++str_count_;
217  }
218  }
219  dictionary_futures.clear();
220 }
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ sortCache()

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1094 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1094  {
1095  // This method is not thread-safe.
1096 
1097  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1098  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1099 
1100  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1101  auto a_str = this->getStringFromStorage(a);
1102  auto b_str = this->getStringFromStorage(b);
1103  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1104  });
1105 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ storageEntryCount()

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 410 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

410  {
411  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
412  if (client_) {
413  return client_->storage_entry_count();
414  }
415  return str_count_;
416 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

Member Data Documentation

◆ CANARY_BUFFER

char * StringDictionary::CANARY_BUFFER {nullptr}
staticprivate

◆ client_

◆ client_no_timeout_

std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 176 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkRemote().

◆ compare_cache_

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 173 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

◆ equal_cache_

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 172 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

◆ INVALID_STR_ID

◆ isTemp_

bool StringDictionary::isTemp_
private

◆ like_cache_

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 170 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

◆ materialize_hashes_

bool StringDictionary::materialize_hashes_
private

◆ MAX_STRCOUNT

constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

◆ MAX_STRLEN

◆ offset_fd_

int StringDictionary::offset_fd_
private

◆ offset_file_size_

size_t StringDictionary::offset_file_size_
private

◆ offset_map_

StringIdxEntry* StringDictionary::offset_map_
private

◆ offsets_path_

std::string StringDictionary::offsets_path_
private

Definition at line 160 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddImpl(), and StringDictionary().

◆ payload_fd_

int StringDictionary::payload_fd_
private

◆ payload_file_off_

size_t StringDictionary::payload_file_off_
private

Definition at line 167 of file StringDictionary.h.

Referenced by appendToStorage(), and processDictionaryFutures().

◆ payload_file_size_

size_t StringDictionary::payload_file_size_
private

◆ payload_map_

char* StringDictionary::payload_map_
private

◆ regex_cache_

std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 171 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

◆ rk_hashes_

std::vector<uint32_t> StringDictionary::rk_hashes_
private

◆ rw_mutex_

◆ sorted_cache

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 157 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

◆ str_count_

◆ str_ids_

std::vector<int32_t> StringDictionary::str_ids_
private

◆ strings_cache_

std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 174 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: