OmniSciDB  29e35f4d58
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T >
void getOrAddBulk (const std::vector< std::string > &string_vec, T *encoded_vec)
 
void getOrAddBulkArray (const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const std::vector< std::string > > copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
bool fillRateIsHigh () const noexcept
 
void increaseCapacity () noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class T >
void getOrAddBulkRemote (const std::vector< std::string > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
uint32_t computeBucket (const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) const noexcept
 
void appendToStorage (const std::string &str) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
void addPayloadCapacity () noexcept
 
void addOffsetCapacity () noexcept
 
size_t addStorageCapacity (int fd) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
std::vector< int32_t > str_ids_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
 
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string, compare_cache_value_tcompare_cache_
 
std::shared_ptr< std::vector< std::string > > strings_cache_
 
std::unique_ptr< StringDictionaryClientclient_
 
std::unique_ptr< StringDictionaryClientclient_no_timeout_
 

Static Private Attributes

static char * CANARY_BUFFER {nullptr}
 

Detailed Description

Definition at line 41 of file StringDictionary.h.

Constructor & Destructor Documentation

◆ StringDictionary() [1/2]

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 106 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), anonymous_namespace{StringDictionary.cpp}::file_size(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_ids_, and logger::WARNING.

111  : str_count_(0)
112  , str_ids_(initial_capacity, INVALID_STR_ID)
113  , rk_hashes_(initial_capacity)
114  , isTemp_(isTemp)
115  , materialize_hashes_(materializeHashes)
116  , payload_fd_(-1)
117  , offset_fd_(-1)
118  , offset_map_(nullptr)
119  , payload_map_(nullptr)
120  , offset_file_size_(0)
121  , payload_file_size_(0)
122  , payload_file_off_(0)
123  , strings_cache_(nullptr) {
124  if (!isTemp && folder.empty()) {
125  return;
126  }
127 
128  // initial capacity must be a power of two for efficient bucket computation
129  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
130  if (!isTemp_) {
131  boost::filesystem::path storage_path(folder);
132  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
133  const auto payload_path =
134  (storage_path / boost::filesystem::path("DictPayload")).string();
135  payload_fd_ = checked_open(payload_path.c_str(), recover);
136  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
138  offset_file_size_ = file_size(offset_fd_);
139  }
140 
141  if (payload_file_size_ == 0) {
143  }
144  if (offset_file_size_ == 0) {
146  }
147  if (!isTemp_) { // we never mmap or recover temp dictionaries
148  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
149  offset_map_ =
150  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
151  if (recover) {
152  const size_t bytes = file_size(offset_fd_);
153  if (bytes % sizeof(StringIdxEntry) != 0) {
154  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
155  }
156  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
157  // at this point we know the size of the StringDict we need to load
158  // so lets reallocate the vector to the correct size
159  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
160  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
161  str_ids_.swap(new_str_ids);
162  if (materialize_hashes_) {
163  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
164  rk_hashes_.swap(new_rk_hashes);
165  }
166  unsigned string_id = 0;
167  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
168 
169  uint32_t thread_inits = 0;
170  const auto thread_count = std::thread::hardware_concurrency();
171  const uint32_t items_per_thread = std::max<uint32_t>(
172  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
173  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
174  dictionary_futures;
175  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
176  dictionary_futures.emplace_back(std::async(
177  std::launch::async, [string_id, str_count, items_per_thread, this] {
178  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
179  for (uint32_t curr_id = string_id;
180  curr_id < string_id + items_per_thread && curr_id < str_count;
181  curr_id++) {
182  const auto recovered = getStringFromStorage(curr_id);
183  if (recovered.canary) {
184  // hit the canary, recovery finished
185  break;
186  } else {
187  std::string temp(recovered.c_str_ptr, recovered.size);
188  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
189  }
190  }
191  return hashVec;
192  }));
193  thread_inits++;
194  if (thread_inits % thread_count == 0) {
195  processDictionaryFutures(dictionary_futures);
196  }
197  }
198  // gather last few threads
199  if (dictionary_futures.size() != 0) {
200  processDictionaryFutures(dictionary_futures);
201  }
202  }
203  }
204 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:201
#define LOG(tag)
Definition: Logger.h:188
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
void addPayloadCapacity() noexcept
mapd_shared_mutex rw_mutex_
int checked_open(const char *path, const bool recover)
void addOffsetCapacity() noexcept
uint32_t rk_hash(const std::string &str)
void * checked_mmap(const int fd, const size_t sz)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > str_ids_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:

◆ StringDictionary() [2/2]

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 225 of file StringDictionary.cpp.

226  : strings_cache_(nullptr)
227  , client_(new StringDictionaryClient(host, dict_ref, true))
228  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_

◆ ~StringDictionary()

StringDictionary::~StringDictionary ( )
noexcept

Definition at line 230 of file StringDictionary.cpp.

References CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

230  {
231  if (client_) {
232  return;
233  }
234  if (payload_map_) {
235  if (!isTemp_) {
239  CHECK_GE(payload_fd_, 0);
241  CHECK_GE(offset_fd_, 0);
242  close(offset_fd_);
243  } else {
245  free(payload_map_);
246  free(offset_map_);
247  }
248  }
249 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:206
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:193
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
+ Here is the call graph for this function:

Member Function Documentation

◆ addMemoryCapacity()

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size 
)
privatenoexcept

Definition at line 1039 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1039  {
1040  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1041  if (!CANARY_BUFFER) {
1042  CANARY_BUFFER = reinterpret_cast<char*>(malloc(CANARY_BUFF_SIZE));
1044  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1045  }
1046  void* new_addr = realloc(addr, mem_size + CANARY_BUFF_SIZE);
1047  CHECK(new_addr);
1048  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1049  CHECK(memcpy(write_addr, CANARY_BUFFER, CANARY_BUFF_SIZE));
1050  mem_size += CANARY_BUFF_SIZE;
1051  return new_addr;
1052 }
static char * CANARY_BUFFER
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the caller graph for this function:

◆ addOffsetCapacity()

void StringDictionary::addOffsetCapacity ( )
privatenoexcept

Definition at line 1018 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, offset_fd_, offset_file_size_, and offset_map_.

Referenced by appendToStorage(), and StringDictionary().

1018  {
1019  if (!isTemp_) {
1021  } else {
1022  offset_map_ =
1023  static_cast<StringIdxEntry*>(addMemoryCapacity(offset_map_, offset_file_size_));
1024  }
1025 }
StringIdxEntry * offset_map_
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addPayloadCapacity()

void StringDictionary::addPayloadCapacity ( )
privatenoexcept

Definition at line 1009 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by appendToStorage(), and StringDictionary().

1009  {
1010  if (!isTemp_) {
1012  } else {
1013  payload_map_ =
1014  static_cast<char*>(addMemoryCapacity(payload_map_, payload_file_size_));
1015  }
1016 }
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ addStorageCapacity()

size_t StringDictionary::addStorageCapacity ( int  fd)
privatenoexcept

Definition at line 1027 of file StringDictionary.cpp.

References CANARY_BUFFER, CHECK, CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1027  {
1028  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1029  if (!CANARY_BUFFER) {
1030  CANARY_BUFFER = static_cast<char*>(malloc(CANARY_BUFF_SIZE));
1032  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1033  }
1034  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1035  CHECK(write(fd, CANARY_BUFFER, CANARY_BUFF_SIZE) == CANARY_BUFF_SIZE);
1036  return CANARY_BUFF_SIZE;
1037 }
#define CHECK_NE(x, y)
Definition: Logger.h:202
static char * CANARY_BUFFER
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ appendToStorage()

void StringDictionary::appendToStorage ( const std::string &  str)
privatenoexcept

Definition at line 958 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK, CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_off_, payload_file_size_, payload_map_, StringDictionary::StringIdxEntry::size, and str_count_.

Referenced by getOrAddBulk(), and getOrAddImpl().

958  {
959  if (!isTemp_) {
960  CHECK_GE(payload_fd_, 0);
961  CHECK_GE(offset_fd_, 0);
962  }
963  // write the payload
964  if (payload_file_off_ + str.size() > payload_file_size_) {
965  if (!isTemp_) {
968  CHECK(payload_file_off_ + str.size() <= payload_file_size_);
969  payload_map_ =
970  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
971  } else {
973  }
974  }
975  memcpy(payload_map_ + payload_file_off_, str.c_str(), str.size());
976  // write the offset and length
977  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
978  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
979  payload_file_off_ += str.size();
980  if (offset_file_off + sizeof(str_meta) >= offset_file_size_) {
981  if (!isTemp_) {
984  CHECK(offset_file_off + sizeof(str_meta) <= offset_file_size_);
985  offset_map_ =
986  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
987  } else {
989  }
990  }
991  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
992 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:206
void addPayloadCapacity() noexcept
void addOffsetCapacity() noexcept
void * checked_mmap(const int fd, const size_t sz)
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ binary_search_cache()

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private

◆ buildSortedCache()

void StringDictionary::buildSortedCache ( )
private

Definition at line 1086 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1086  {
1087  // This method is not thread-safe.
1088  const auto cur_cache_size = sorted_cache.size();
1089  std::vector<int32_t> temp_sorted_cache;
1090  for (size_t i = cur_cache_size; i < str_count_; i++) {
1091  temp_sorted_cache.push_back(i);
1092  }
1093  sortCache(temp_sorted_cache);
1094  mergeSortedCache(temp_sorted_cache);
1095 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ checkpoint()

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1069 of file StringDictionary.cpp.

References CHECK, client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

1069  {
1070  if (client_) {
1071  try {
1072  return client_->checkpoint();
1073  } catch (...) {
1074  return false;
1075  }
1076  }
1077  CHECK(!isTemp_);
1078  bool ret = true;
1079  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1080  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1081  ret = ret && (fsync(offset_fd_) == 0);
1082  ret = ret && (fsync(payload_fd_) == 0);
1083  return ret;
1084 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
#define CHECK(condition)
Definition: Logger.h:193

◆ computeBucket()

uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const std::string  str,
const std::vector< int32_t > &  data,
const bool  unique 
) const
privatenoexcept

Definition at line 901 of file StringDictionary.cpp.

References getStringFromStorage(), INVALID_STR_ID, materialize_hashes_, and rk_hashes_.

Referenced by getOrAddBulk(), getOrAddImpl(), and getUnlocked().

904  {
905  auto bucket = hash & (data.size() - 1);
906  while (true) {
907  if (data[bucket] ==
908  INVALID_STR_ID) { // In this case it means the slot is available for use
909  break;
910  }
911  // if records are unique I don't need to do this test as I know it will not be the
912  // same
913  if (!unique) {
914  if (materialize_hashes_) {
915  if (hash == rk_hashes_[data[bucket]]) {
916  // can't be the same string if hash is different
917  const auto old_str = getStringFromStorage(data[bucket]);
918  if (str.size() == old_str.size &&
919  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
920  // found the string
921  break;
922  }
923  }
924  } else {
925  const auto old_str = getStringFromStorage(data[bucket]);
926  if (str.size() == old_str.size &&
927  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
928  // found the string
929  break;
930  }
931  }
932  }
933  // wrap around
934  if (++bucket == data.size()) {
935  bucket = 0;
936  }
937  }
938  return bucket;
939 }
static constexpr int32_t INVALID_STR_ID
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ computeUniqueBucketWithHash()

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 941 of file StringDictionary.cpp.

References INVALID_STR_ID.

Referenced by increaseCapacity(), and processDictionaryFutures().

943  {
944  auto bucket = hash & (data.size() - 1);
945  while (true) {
946  if (data[bucket] ==
947  INVALID_STR_ID) { // In this case it means the slot is available for use
948  break;
949  }
950  // wrap around
951  if (++bucket == data.size()) {
952  bucket = 0;
953  }
954  }
955  return bucket;
956 }
static constexpr int32_t INVALID_STR_ID
+ Here is the caller graph for this function:

◆ copyStrings()

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 768 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

768  {
769  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
770  if (client_) {
771  // TODO(miyu): support remote string dictionary
772  throw std::runtime_error(
773  "copying dictionaries from remote server is not supported yet.");
774  }
775 
776  if (strings_cache_) {
777  return strings_cache_;
778  }
779 
780  strings_cache_ = std::make_shared<std::vector<std::string>>();
781  strings_cache_->reserve(str_count_);
782  const bool multithreaded = str_count_ > 10000;
783  const auto worker_count =
784  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
785  CHECK_GT(worker_count, 0UL);
786  std::vector<std::vector<std::string>> worker_results(worker_count);
787  auto copy = [this](std::vector<std::string>& str_list,
788  const size_t start_id,
789  const size_t end_id) {
790  CHECK_LE(start_id, end_id);
791  str_list.reserve(end_id - start_id);
792  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
793  str_list.push_back(getStringUnlocked(string_id));
794  }
795  };
796  if (multithreaded) {
797  std::vector<std::future<void>> workers;
798  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
799  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
800  worker_idx < worker_count && start < str_count_;
801  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
802  workers.push_back(std::async(
803  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
804  }
805  for (auto& worker : workers) {
806  worker.get();
807  }
808  } else {
809  CHECK_EQ(worker_results.size(), size_t(1));
810  copy(worker_results[0], 0, str_count_);
811  }
812 
813  for (const auto& worker_result : worker_results) {
814  strings_cache_->insert(
815  strings_cache_->end(), worker_result.begin(), worker_result.end());
816  }
817  return strings_cache_;
818 }
#define CHECK_EQ(x, y)
Definition: Logger.h:201
#define CHECK_GT(x, y)
Definition: Logger.h:205
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:204
int cpu_threads()
Definition: thread_count.h:25
+ Here is the call graph for this function:

◆ fillRateIsHigh()

bool StringDictionary::fillRateIsHigh ( ) const
privatenoexcept

Definition at line 820 of file StringDictionary.cpp.

References str_count_, and str_ids_.

Referenced by getOrAddBulk(), and getOrAddImpl().

820  {
821  return str_ids_.size() < str_count_ * 2;
822 }
std::vector< int32_t > str_ids_
+ Here is the caller graph for this function:

◆ getCompare()

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 560 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

562  {
563  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
564  if (client_) {
565  return client_->get_compare(pattern, comp_operator, generation);
566  }
567  std::vector<int32_t> ret;
568  if (str_count_ == 0) {
569  return ret;
570  }
571  if (sorted_cache.size() < str_count_) {
572  if (comp_operator == "=" || comp_operator == "<>") {
573  return getEquals(pattern, comp_operator, generation);
574  }
575 
577  }
578  auto cache_index = compare_cache_.get(pattern);
579 
580  if (!cache_index) {
581  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
582  const auto cache_itr = std::lower_bound(
583  sorted_cache.begin(),
584  sorted_cache.end(),
585  pattern,
586  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
587  auto a_str = this->getStringFromStorage(a);
588  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
589  });
590 
591  if (cache_itr == sorted_cache.end()) {
592  cache_index->index = sorted_cache.size() - 1;
593  cache_index->diff = 1;
594  } else {
595  const auto cache_str = getStringFromStorage(*cache_itr);
596  if (!string_eq(
597  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
598  cache_index->index = cache_itr - sorted_cache.begin() - 1;
599  cache_index->diff = 1;
600  } else {
601  cache_index->index = cache_itr - sorted_cache.begin();
602  cache_index->diff = 0;
603  }
604  }
605 
606  compare_cache_.put(pattern, cache_index);
607  }
608 
609  // since we have a cache in form of vector of ints which is sorted according to
610  // corresponding strings in the dictionary all we need is the index of the element
611  // which equal to the pattern that we are trying to match or the index of “biggest”
612  // element smaller than the pattern, to perform all the comparison operators over
613  // string. The search function guarantees we have such index so now it is just the
614  // matter to include all the elements in the result vector.
615 
616  // For < operator if the index that we have points to the element which is equal to
617  // the pattern that we are searching for we simply get all the elements less than the
618  // index. If the element pointed by the index is not equal to the pattern we are
619  // comparing with we also need to include that index in result vector, except when the
620  // index points to 0 and the pattern is lesser than the smallest value in the string
621  // dictionary.
622 
623  if (comp_operator == "<") {
624  size_t idx = cache_index->index;
625  if (cache_index->diff) {
626  idx = cache_index->index + 1;
627  if (cache_index->index == 0 && cache_index->diff > 0) {
628  idx = cache_index->index;
629  }
630  }
631  for (size_t i = 0; i < idx; i++) {
632  ret.push_back(sorted_cache[i]);
633  }
634 
635  // For <= operator if the index that we have points to the element which is equal to
636  // the pattern that we are searching for we want to include the element pointed by
637  // the index in the result set. If the element pointed by the index is not equal to
638  // the pattern we are comparing with we just want to include all the ids with index
639  // less than the index that is cached, except when pattern that we are searching for
640  // is smaller than the smallest string in the dictionary.
641 
642  } else if (comp_operator == "<=") {
643  size_t idx = cache_index->index + 1;
644  if (cache_index == 0 && cache_index->diff > 0) {
645  idx = cache_index->index;
646  }
647  for (size_t i = 0; i < idx; i++) {
648  ret.push_back(sorted_cache[i]);
649  }
650 
651  // For > operator we want to get all the elements with index greater than the index
652  // that we have except, when the pattern we are searching for is lesser than the
653  // smallest string in the dictionary we also want to include the id of the index
654  // that we have.
655 
656  } else if (comp_operator == ">") {
657  size_t idx = cache_index->index + 1;
658  if (cache_index->index == 0 && cache_index->diff > 0) {
659  idx = cache_index->index;
660  }
661  for (size_t i = idx; i < sorted_cache.size(); i++) {
662  ret.push_back(sorted_cache[i]);
663  }
664 
665  // For >= operator when the indexed element that we have points to element which is
666  // equal to the pattern we are searching for we want to include that in the result
667  // vector. If the index that we have does not point to the string which is equal to
668  // the pattern we are searching we don’t want to include that id into the result
669  // vector except when the index is 0.
670 
671  } else if (comp_operator == ">=") {
672  size_t idx = cache_index->index;
673  if (cache_index->diff) {
674  idx = cache_index->index + 1;
675  if (cache_index->index == 0 && cache_index->diff > 0) {
676  idx = cache_index->index;
677  }
678  }
679  for (size_t i = idx; i < sorted_cache.size(); i++) {
680  ret.push_back(sorted_cache[i]);
681  }
682  } else if (comp_operator == "=") {
683  if (!cache_index->diff) {
684  ret.push_back(sorted_cache[cache_index->index]);
685  }
686 
687  // For <> operator it is simple matter of not including id of string which is equal
688  // to pattern we are searching for.
689  } else if (comp_operator == "<>") {
690  if (!cache_index->diff) {
691  size_t idx = cache_index->index;
692  for (size_t i = 0; i < idx; i++) {
693  ret.push_back(sorted_cache[i]);
694  }
695  ++idx;
696  for (size_t i = idx; i < sorted_cache.size(); i++) {
697  ret.push_back(sorted_cache[i]);
698  }
699  } else {
700  for (size_t i = 0; i < sorted_cache.size(); i++) {
701  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
702  }
703  }
704 
705  } else {
706  std::runtime_error("Unsupported string comparison operator");
707  }
708  return ret;
709 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:

◆ getEquals()

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 500 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

502  {
503  std::vector<int32_t> result;
504  auto eq_id_itr = equal_cache_.find(pattern);
505  int32_t eq_id = MAX_STRLEN + 1;
506  int32_t cur_size = str_count_;
507  if (eq_id_itr != equal_cache_.end()) {
508  auto eq_id = eq_id_itr->second;
509  if (comp_operator == "=") {
510  result.push_back(eq_id);
511  } else {
512  for (int32_t idx = 0; idx <= cur_size; idx++) {
513  if (idx == eq_id) {
514  continue;
515  }
516  result.push_back(idx);
517  }
518  }
519  } else {
520  std::vector<std::thread> workers;
521  int worker_count = cpu_threads();
522  CHECK_GT(worker_count, 0);
523  std::vector<std::vector<int32_t>> worker_results(worker_count);
524  CHECK_LE(generation, str_count_);
525  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
526  workers.emplace_back(
527  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
528  for (size_t string_id = worker_idx; string_id < generation;
529  string_id += worker_count) {
530  const auto str = getStringUnlocked(string_id);
531  if (str == pattern) {
532  worker_results[worker_idx].push_back(string_id);
533  }
534  }
535  });
536  }
537  for (auto& worker : workers) {
538  worker.join();
539  }
540  for (const auto& worker_result : worker_results) {
541  result.insert(result.end(), worker_result.begin(), worker_result.end());
542  }
543  if (result.size() > 0) {
544  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
545  CHECK(it_ok.second);
546  eq_id = result[0];
547  }
548  if (comp_operator == "<>") {
549  for (int32_t idx = 0; idx <= cur_size; idx++) {
550  if (idx == eq_id) {
551  continue;
552  }
553  result.push_back(idx);
554  }
555  }
556  }
557  return result;
558 }
#define CHECK_GT(x, y)
Definition: Logger.h:205
std::string getStringUnlocked(int32_t string_id) const noexcept
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:204
#define CHECK(condition)
Definition: Logger.h:193
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getIdOfString()

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 375 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

375  {
376  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
377  if (client_) {
378  return client_->get(str);
379  }
380  return getUnlocked(str);
381 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
+ Here is the call graph for this function:

◆ getLike()

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 447 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

451  {
452  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
453  if (client_) {
454  return client_->get_like(pattern, icase, is_simple, escape, generation);
455  }
456  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
457  const auto it = like_cache_.find(cache_key);
458  if (it != like_cache_.end()) {
459  return it->second;
460  }
461  std::vector<int32_t> result;
462  std::vector<std::thread> workers;
463  int worker_count = cpu_threads();
464  CHECK_GT(worker_count, 0);
465  std::vector<std::vector<int32_t>> worker_results(worker_count);
466  CHECK_LE(generation, str_count_);
467  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
468  workers.emplace_back([&worker_results,
469  &pattern,
470  generation,
471  icase,
472  is_simple,
473  escape,
474  worker_idx,
475  worker_count,
476  this]() {
477  for (size_t string_id = worker_idx; string_id < generation;
478  string_id += worker_count) {
479  const auto str = getStringUnlocked(string_id);
480  if (is_like(str, pattern, icase, is_simple, escape)) {
481  worker_results[worker_idx].push_back(string_id);
482  }
483  }
484  });
485  }
486  for (auto& worker : workers) {
487  worker.join();
488  }
489  for (const auto& worker_result : worker_results) {
490  result.insert(result.end(), worker_result.begin(), worker_result.end());
491  }
492  // place result into cache for reuse if similar query
493  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
494 
495  CHECK(it_ok.second);
496 
497  return result;
498 }
#define CHECK_GT(x, y)
Definition: Logger.h:205
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:204
#define CHECK(condition)
Definition: Logger.h:193
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:25
+ Here is the call graph for this function:

◆ getOrAdd()

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 251 of file StringDictionary.cpp.

References CHECK_EQ, client_, and getOrAddImpl().

251  {
252  if (client_) {
253  std::vector<int32_t> string_ids;
254  client_->get_or_add_bulk(string_ids, {str});
255  CHECK_EQ(size_t(1), string_ids.size());
256  return string_ids.front();
257  }
258  return getOrAddImpl(str);
259 }
#define CHECK_EQ(x, y)
Definition: Logger.h:201
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
+ Here is the call graph for this function:

◆ getOrAddBulk()

template<class T >
template void StringDictionary::getOrAddBulk ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)

Definition at line 285 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and str_ids_.

Referenced by getOrAddBulkArray(), and populate_string_ids().

286  {
287  if (client_no_timeout_) {
288  getOrAddBulkRemote(string_vec, encoded_vec);
289  return;
290  }
291  size_t out_idx{0};
292  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
293 
294  for (const auto& str : string_vec) {
295  if (str.empty()) {
296  encoded_vec[out_idx++] = inline_int_null_value<T>();
297  continue;
298  }
299  CHECK(str.size() <= MAX_STRLEN);
300  uint32_t bucket;
301  const uint32_t hash = rk_hash(str);
302  bucket = computeBucket(hash, str, str_ids_, false);
303  if (str_ids_[bucket] != INVALID_STR_ID) {
304  encoded_vec[out_idx++] = str_ids_[bucket];
305  continue;
306  }
307  // need to add record to dictionary
308  // check there is room
309  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
310  log_encoding_error<T>(str);
311  encoded_vec[out_idx++] = inline_int_null_value<T>();
312  continue;
313  }
314  if (str_ids_[bucket] == INVALID_STR_ID) {
316  << "Maximum number (" << str_count_
317  << ") of Dictionary encoded Strings reached for this column, offset path "
318  "for column is "
319  << offsets_path_;
320  if (fillRateIsHigh()) {
321  // resize when more than 50% is full
323  bucket = computeBucket(hash, str, str_ids_, false);
324  }
325  appendToStorage(str);
326 
327  str_ids_[bucket] = static_cast<int32_t>(str_count_);
328  if (materialize_hashes_) {
329  rk_hashes_[str_count_] = hash;
330  }
331  ++str_count_;
332  }
333  encoded_vec[out_idx++] = str_ids_[bucket];
334  }
336 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:203
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void getOrAddBulkRemote(const std::vector< std::string > &string_vec, T *encoded_vec)
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:193
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOrAddBulkArray()

void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< std::string >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 272 of file StringDictionary.cpp.

References getOrAddBulk().

274  {
275  ids_array_vec.resize(string_array_vec.size());
276  for (size_t i = 0; i < string_array_vec.size(); i++) {
277  auto& strings = string_array_vec[i];
278  auto& ids = ids_array_vec[i];
279  ids.resize(strings.size());
280  getOrAddBulk(strings, &ids[0]);
281  }
282 }
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:

◆ getOrAddBulkRemote()

template<class T >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 345 of file StringDictionary.cpp.

References CHECK, and client_no_timeout_.

Referenced by getOrAddBulk().

346  {
348  std::vector<int32_t> string_ids;
349  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
350  size_t out_idx{0};
351  for (size_t i = 0; i < string_ids.size(); ++i) {
352  const auto string_id = string_ids[i];
353  const bool invalid = string_id > max_valid_int_value<T>();
354  if (invalid || string_id == inline_int_null_value<int32_t>()) {
355  if (invalid) {
356  log_encoding_error<T>(string_vec[i]);
357  }
358  encoded_vec[out_idx++] = inline_int_null_value<T>();
359  continue;
360  }
361  encoded_vec[out_idx++] = string_id;
362  }
363 }
std::unique_ptr< StringDictionaryClient > client_no_timeout_
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the caller graph for this function:

◆ getOrAddImpl()

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 847 of file StringDictionary.cpp.

References appendToStorage(), CHECK, CHECK_LT, computeBucket(), fillRateIsHigh(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and str_ids_.

Referenced by getOrAdd().

847  {
848  // @TODO(wei) treat empty string as NULL for now
849  if (str.size() == 0) {
850  return inline_int_null_value<int32_t>();
851  }
852  CHECK(str.size() <= MAX_STRLEN);
853  uint32_t bucket;
854  const uint32_t hash = rk_hash(str);
855  {
856  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
857  bucket = computeBucket(hash, str, str_ids_, false);
858  if (str_ids_[bucket] != INVALID_STR_ID) {
859  return str_ids_[bucket];
860  }
861  }
862  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
863  // need to recalculate the bucket in case it changed before
864  // we got the lock
865  bucket = computeBucket(hash, str, str_ids_, false);
866  if (str_ids_[bucket] == INVALID_STR_ID) {
868  << "Maximum number (" << str_count_
869  << ") of Dictionary encoded Strings reached for this column, offset path "
870  "for column is "
871  << offsets_path_;
872  if (fillRateIsHigh()) {
873  // resize when more than 50% is full
875  bucket = computeBucket(hash, str, str_ids_, false);
876  }
877  appendToStorage(str);
878  str_ids_[bucket] = static_cast<int32_t>(str_count_);
879  if (materialize_hashes_) {
880  rk_hashes_[str_count_] = hash;
881  }
882  ++str_count_;
884  }
885  return str_ids_[bucket];
886 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:203
void invalidateInvertedIndex() noexcept
#define CHECK(condition)
Definition: Logger.h:193
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getRegexpLike()

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 721 of file StringDictionary.cpp.

References CHECK, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

723  {
724  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
725  if (client_) {
726  return client_->get_regexp_like(pattern, escape, generation);
727  }
728  const auto cache_key = std::make_pair(pattern, escape);
729  const auto it = regex_cache_.find(cache_key);
730  if (it != regex_cache_.end()) {
731  return it->second;
732  }
733  std::vector<int32_t> result;
734  std::vector<std::thread> workers;
735  int worker_count = cpu_threads();
736  CHECK_GT(worker_count, 0);
737  std::vector<std::vector<int32_t>> worker_results(worker_count);
738  CHECK_LE(generation, str_count_);
739  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
740  workers.emplace_back([&worker_results,
741  &pattern,
742  generation,
743  escape,
744  worker_idx,
745  worker_count,
746  this]() {
747  for (size_t string_id = worker_idx; string_id < generation;
748  string_id += worker_count) {
749  const auto str = getStringUnlocked(string_id);
750  if (is_regexp_like(str, pattern, escape)) {
751  worker_results[worker_idx].push_back(string_id);
752  }
753  }
754  });
755  }
756  for (auto& worker : workers) {
757  worker.join();
758  }
759  for (const auto& worker_result : worker_results) {
760  result.insert(result.end(), worker_result.begin(), worker_result.end());
761  }
762  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
763  CHECK(it_ok.second);
764 
765  return result;
766 }
#define CHECK_GT(x, y)
Definition: Logger.h:205
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:204
#define CHECK(condition)
Definition: Logger.h:193
int cpu_threads()
Definition: thread_count.h:25
+ Here is the call graph for this function:

◆ getString()

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 389 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

389  {
390  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
391  if (client_) {
392  std::string ret;
393  client_->get_string(ret, string_id);
394  return ret;
395  }
396  return getStringUnlocked(string_id);
397 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringBytes()

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 404 of file StringDictionary.cpp.

References CHECK, CHECK_LE, CHECK_LT, client_, getStringBytesChecked(), rw_mutex_, and str_count_.

405  {
406  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
407  CHECK(!client_);
408  CHECK_LE(0, string_id);
409  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
410  return getStringBytesChecked(string_id);
411 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:203
#define CHECK_LE(x, y)
Definition: Logger.h:204
#define CHECK(condition)
Definition: Logger.h:193
+ Here is the call graph for this function:

◆ getStringBytesChecked()

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 894 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringBytes().

895  {
896  const auto str_canary = getStringFromStorage(string_id);
897  CHECK(!str_canary.canary);
898  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
899 }
#define CHECK(condition)
Definition: Logger.h:193
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringChecked()

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 888 of file StringDictionary.cpp.

References CHECK, and getStringFromStorage().

Referenced by getStringUnlocked(), and increaseCapacity().

888  {
889  const auto str_canary = getStringFromStorage(string_id);
890  CHECK(!str_canary.canary);
891  return std::string(str_canary.c_str_ptr, str_canary.size);
892 }
#define CHECK(condition)
Definition: Logger.h:193
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getStringFromStorage()

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 994 of file StringDictionary.cpp.

References CHECK_GE, isTemp_, StringDictionary::StringIdxEntry::off, offset_fd_, offset_map_, payload_fd_, payload_map_, and StringDictionary::StringIdxEntry::size.

Referenced by computeBucket(), getCompare(), getStringBytesChecked(), getStringChecked(), mergeSortedCache(), sortCache(), and StringDictionary().

995  {
996  if (!isTemp_) {
997  CHECK_GE(payload_fd_, 0);
998  CHECK_GE(offset_fd_, 0);
999  }
1000  CHECK_GE(string_id, 0);
1001  const StringIdxEntry* str_meta = offset_map_ + string_id;
1002  if (str_meta->size == 0xffff) {
1003  // hit the canary
1004  return {nullptr, 0, true};
1005  }
1006  return {payload_map_ + str_meta->off, str_meta->size, false};
1007 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:206
+ Here is the caller graph for this function:

◆ getStringUnlocked()

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 399 of file StringDictionary.cpp.

References CHECK_LT, getStringChecked(), and str_count_.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

399  {
400  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
401  return getStringChecked(string_id);
402 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:203
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getUnlocked()

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 383 of file StringDictionary.cpp.

References computeBucket(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), and str_ids_.

Referenced by getIdOfString().

383  {
384  const uint32_t hash = rk_hash(str);
385  auto str_id = str_ids_[computeBucket(hash, str, str_ids_, false)];
386  return str_id;
387 }
uint32_t rk_hash(const std::string &str)
std::vector< int32_t > str_ids_
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ increaseCapacity()

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 824 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and str_ids_.

Referenced by getOrAddBulk(), and getOrAddImpl().

824  {
825  std::vector<int32_t> new_str_ids(str_ids_.size() * 2, INVALID_STR_ID);
826 
827  if (materialize_hashes_) {
828  for (size_t i = 0; i < str_ids_.size(); ++i) {
829  if (str_ids_[i] != INVALID_STR_ID) {
830  const uint32_t hash = rk_hashes_[str_ids_[i]];
831  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
832  new_str_ids[bucket] = str_ids_[i];
833  }
834  }
835  rk_hashes_.resize(rk_hashes_.size() * 2);
836  } else {
837  for (size_t i = 0; i < str_count_; ++i) {
838  const auto str = getStringChecked(i);
839  const uint32_t hash = rk_hash(str);
840  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
841  new_str_ids[bucket] = i;
842  }
843  }
844  str_ids_.swap(new_str_ids);
845 }
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t rk_hash(const std::string &str)
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ insertInSortedCache()

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private

◆ invalidateInvertedIndex()

void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1054 of file StringDictionary.cpp.

References CANARY_BUFFER, compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk(), and getOrAddImpl().

1054  {
1055  if (!like_cache_.empty()) {
1056  decltype(like_cache_)().swap(like_cache_);
1057  }
1058  if (!regex_cache_.empty()) {
1059  decltype(regex_cache_)().swap(regex_cache_);
1060  }
1061  if (!equal_cache_.empty()) {
1062  decltype(equal_cache_)().swap(equal_cache_);
1063  }
1064  compare_cache_.invalidateInvertedIndex();
1065 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
+ Here is the caller graph for this function:

◆ mergeSortedCache()

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1110 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1110  {
1111  // this method is not thread safe
1112  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1113  size_t t_idx = 0, s_idx = 0, idx = 0;
1114  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1115  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1116  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1117  const auto insert_from_temp_cache =
1118  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1119  if (insert_from_temp_cache) {
1120  updated_cache[idx] = temp_sorted_cache[t_idx++];
1121  } else {
1122  updated_cache[idx] = sorted_cache[s_idx++];
1123  }
1124  }
1125  while (t_idx < temp_sorted_cache.size()) {
1126  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1127  }
1128  while (s_idx < sorted_cache.size()) {
1129  updated_cache[idx++] = sorted_cache[s_idx++];
1130  }
1131  sorted_cache.swap(updated_cache);
1132 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_array_ids()

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1161 of file StringDictionary.cpp.

References populate_string_ids().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1165  {
1166  dest_array_ids.resize(source_array_ids.size());
1167 
1168  std::atomic<size_t> row_idx{0};
1169  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1170  int thread_id) {
1171  for (;;) {
1172  auto row = row_idx.fetch_add(1);
1173 
1174  if (row >= dest_array_ids.size()) {
1175  return;
1176  }
1177  const auto& source_ids = source_array_ids[row];
1178  auto& dest_ids = dest_array_ids[row];
1179  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1180  }
1181  };
1182 
1183  const int num_worker_threads = std::thread::hardware_concurrency();
1184 
1185  if (source_array_ids.size() / num_worker_threads > 10) {
1186  std::vector<std::future<void>> worker_threads;
1187  for (int i = 0; i < num_worker_threads; ++i) {
1188  worker_threads.push_back(std::async(std::launch::async, processor, i));
1189  }
1190 
1191  for (auto& child : worker_threads) {
1192  child.wait();
1193  }
1194  for (auto& child : worker_threads) {
1195  child.get();
1196  }
1197  } else {
1198  processor(0);
1199  }
1200 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ populate_string_ids()

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1134 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1139  {
1140  std::vector<std::string> strings;
1141 
1142  for (const int32_t source_id : source_ids) {
1143  if (source_id == std::numeric_limits<int32_t>::min()) {
1144  strings.emplace_back("");
1145  } else if (source_id < 0) {
1146  if (auto string_itr = transient_mapping.find(source_id);
1147  string_itr != transient_mapping.end()) {
1148  strings.emplace_back(string_itr->second);
1149  } else {
1150  throw std::runtime_error("Unexpected negative source ID");
1151  }
1152  } else {
1153  strings.push_back(source_dict->getString(source_id));
1154  }
1155  }
1156 
1157  dest_ids.resize(strings.size());
1158  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1159 }
std::string getString(int32_t string_id) const
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ processDictionaryFutures()

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 206 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and str_ids_.

Referenced by StringDictionary().

208  {
209  for (auto& dictionary_future : dictionary_futures) {
210  dictionary_future.wait();
211  auto hashVec = dictionary_future.get();
212  for (auto& hash : hashVec) {
213  uint32_t bucket = computeUniqueBucketWithHash(hash.first, str_ids_);
214  payload_file_off_ += hash.second;
215  str_ids_[bucket] = static_cast<int32_t>(str_count_);
216  if (materialize_hashes_) {
217  rk_hashes_[str_count_] = hash.first;
218  }
219  ++str_count_;
220  }
221  }
222  dictionary_futures.clear();
223 }
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ sortCache()

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1097 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1097  {
1098  // This method is not thread-safe.
1099 
1100  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1101  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1102 
1103  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1104  auto a_str = this->getStringFromStorage(a);
1105  auto b_str = this->getStringFromStorage(b);
1106  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1107  });
1108 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ storageEntryCount()

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 413 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

413  {
414  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
415  if (client_) {
416  return client_->storage_entry_count();
417  }
418  return str_count_;
419 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

Member Data Documentation

◆ CANARY_BUFFER

char * StringDictionary::CANARY_BUFFER {nullptr}
staticprivate

◆ client_

◆ client_no_timeout_

std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 190 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkRemote().

◆ compare_cache_

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 187 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

◆ equal_cache_

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 186 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

◆ INVALID_STR_ID

◆ isTemp_

bool StringDictionary::isTemp_
private

◆ like_cache_

std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 184 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

◆ materialize_hashes_

bool StringDictionary::materialize_hashes_
private

◆ MAX_STRCOUNT

constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

◆ MAX_STRLEN

◆ offset_fd_

int StringDictionary::offset_fd_
private

◆ offset_file_size_

size_t StringDictionary::offset_file_size_
private

◆ offset_map_

StringIdxEntry* StringDictionary::offset_map_
private

◆ offsets_path_

std::string StringDictionary::offsets_path_
private

Definition at line 174 of file StringDictionary.h.

Referenced by getOrAddBulk(), getOrAddImpl(), and StringDictionary().

◆ payload_fd_

int StringDictionary::payload_fd_
private

◆ payload_file_off_

size_t StringDictionary::payload_file_off_
private

Definition at line 181 of file StringDictionary.h.

Referenced by appendToStorage(), and processDictionaryFutures().

◆ payload_file_size_

size_t StringDictionary::payload_file_size_
private

◆ payload_map_

char* StringDictionary::payload_map_
private

◆ regex_cache_

std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 185 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

◆ rk_hashes_

std::vector<uint32_t> StringDictionary::rk_hashes_
private

◆ rw_mutex_

◆ sorted_cache

std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 171 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

◆ str_count_

◆ str_ids_

std::vector<int32_t> StringDictionary::str_ids_
private

◆ strings_cache_

std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 188 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: