OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
StringDictionary Class Reference

#include <StringDictionary.h>

+ Collaboration diagram for StringDictionary:

Classes

struct  compare_cache_value_t
 
struct  PayloadString
 
struct  StringIdxEntry
 

Public Member Functions

 StringDictionary (const std::string &folder, const bool isTemp, const bool recover, const bool materializeHashes=false, size_t initial_capacity=256)
 
 StringDictionary (const LeafHostInfo &host, const DictRef dict_ref)
 
 ~StringDictionary () noexcept
 
int32_t getOrAdd (const std::string &str) noexcept
 
template<class T >
void getOrAddBulk (const std::vector< std::string > &string_vec, T *encoded_vec)
 
void getOrAddBulkArray (const std::vector< std::vector< std::string >> &string_array_vec, std::vector< std::vector< int32_t >> &ids_array_vec)
 
int32_t getIdOfString (const std::string &str) const
 
std::string getString (int32_t string_id) const
 
std::pair< char *, size_t > getStringBytes (int32_t string_id) const noexcept
 
size_t storageEntryCount () const
 
std::vector< int32_t > getLike (const std::string &pattern, const bool icase, const bool is_simple, const char escape, const size_t generation) const
 
std::vector< int32_t > getCompare (const std::string &pattern, const std::string &comp_operator, const size_t generation)
 
std::vector< int32_t > getRegexpLike (const std::string &pattern, const char escape, const size_t generation) const
 
std::shared_ptr< const
std::vector< std::string > > 
copyStrings () const
 
bool checkpoint () noexcept
 

Static Public Member Functions

static void populate_string_ids (std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
 Populates provided dest_ids vector with string ids corresponding to given source strings. More...
 
static void populate_string_array_ids (std::vector< std::vector< int32_t >> &dest_array_ids, StringDictionary *dest_dict, const std::vector< std::vector< int32_t >> &source_array_ids, const StringDictionary *source_dict)
 

Static Public Attributes

static constexpr int32_t INVALID_STR_ID = -1
 
static constexpr size_t MAX_STRLEN = (1 << 15) - 1
 
static constexpr size_t MAX_STRCOUNT = (1U << 31) - 1
 

Private Member Functions

void processDictionaryFutures (std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
 
bool fillRateIsHigh () const noexcept
 
void increaseCapacity () noexcept
 
int32_t getOrAddImpl (const std::string &str) noexcept
 
template<class T >
void getOrAddBulkRemote (const std::vector< std::string > &string_vec, T *encoded_vec)
 
int32_t getUnlocked (const std::string &str) const noexcept
 
std::string getStringUnlocked (int32_t string_id) const noexcept
 
std::string getStringChecked (const int string_id) const noexcept
 
std::pair< char *, size_t > getStringBytesChecked (const int string_id) const noexcept
 
uint32_t computeBucket (const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
 
uint32_t computeUniqueBucketWithHash (const uint32_t hash, const std::vector< int32_t > &data) const noexcept
 
void appendToStorage (const std::string &str) noexcept
 
PayloadString getStringFromStorage (const int string_id) const noexcept
 
void addPayloadCapacity () noexcept
 
void addOffsetCapacity () noexcept
 
size_t addStorageCapacity (int fd) noexcept
 
void * addMemoryCapacity (void *addr, size_t &mem_size) noexcept
 
void invalidateInvertedIndex () noexcept
 
std::vector< int32_t > getEquals (std::string pattern, std::string comp_operator, size_t generation)
 
void buildSortedCache ()
 
void insertInSortedCache (std::string str, int32_t str_id)
 
void sortCache (std::vector< int32_t > &cache)
 
void mergeSortedCache (std::vector< int32_t > &temp_sorted_cache)
 
compare_cache_value_tbinary_search_cache (const std::string &pattern) const
 

Private Attributes

size_t str_count_
 
std::vector< int32_t > str_ids_
 
std::vector< uint32_t > rk_hashes_
 
std::vector< int32_t > sorted_cache
 
bool isTemp_
 
bool materialize_hashes_
 
std::string offsets_path_
 
int payload_fd_
 
int offset_fd_
 
StringIdxEntryoffset_map_
 
char * payload_map_
 
size_t offset_file_size_
 
size_t payload_file_size_
 
size_t payload_file_off_
 
mapd_shared_mutex rw_mutex_
 
std::map< std::tuple
< std::string, bool, bool,
char >, std::vector< int32_t > > 
like_cache_
 
std::map< std::pair
< std::string, char >
, std::vector< int32_t > > 
regex_cache_
 
std::map< std::string, int32_t > equal_cache_
 
DictionaryCache< std::string,
compare_cache_value_t
compare_cache_
 
std::shared_ptr< std::vector
< std::string > > 
strings_cache_
 
std::unique_ptr
< StringDictionaryClient
client_
 
std::unique_ptr
< StringDictionaryClient
client_no_timeout_
 

Static Private Attributes

static char * CANARY_BUFFER {nullptr}
 

Detailed Description

Definition at line 46 of file StringDictionary.h.

Constructor & Destructor Documentation

StringDictionary::StringDictionary ( const std::string &  folder,
const bool  isTemp,
const bool  recover,
const bool  materializeHashes = false,
size_t  initial_capacity = 256 
)

Definition at line 103 of file StringDictionary.cpp.

References addOffsetCapacity(), addPayloadCapacity(), CHECK_EQ, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_open(), anonymous_namespace{StringDictionary.cpp}::file_size(), getStringFromStorage(), INVALID_STR_ID, isTemp_, LOG, materialize_hashes_, offset_fd_, offset_file_size_, offset_map_, offsets_path_, payload_fd_, payload_file_size_, payload_map_, processDictionaryFutures(), anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, anonymous_namespace{StringDictionary.cpp}::round_up_p2(), rw_mutex_, str_ids_, and logger::WARNING.

108  : str_count_(0)
109  , str_ids_(initial_capacity, INVALID_STR_ID)
110  , rk_hashes_(initial_capacity)
111  , isTemp_(isTemp)
112  , materialize_hashes_(materializeHashes)
113  , payload_fd_(-1)
114  , offset_fd_(-1)
115  , offset_map_(nullptr)
116  , payload_map_(nullptr)
117  , offset_file_size_(0)
118  , payload_file_size_(0)
119  , payload_file_off_(0)
120  , strings_cache_(nullptr) {
121  if (!isTemp && folder.empty()) {
122  return;
123  }
124 
125  // initial capacity must be a power of two for efficient bucket computation
126  CHECK_EQ(size_t(0), (initial_capacity & (initial_capacity - 1)));
127  if (!isTemp_) {
128  boost::filesystem::path storage_path(folder);
129  offsets_path_ = (storage_path / boost::filesystem::path("DictOffsets")).string();
130  const auto payload_path =
131  (storage_path / boost::filesystem::path("DictPayload")).string();
132  payload_fd_ = checked_open(payload_path.c_str(), recover);
133  offset_fd_ = checked_open(offsets_path_.c_str(), recover);
135  offset_file_size_ = file_size(offset_fd_);
136  }
137 
138  if (payload_file_size_ == 0) {
140  }
141  if (offset_file_size_ == 0) {
143  }
144  if (!isTemp_) { // we never mmap or recover temp dictionaries
145  payload_map_ = reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
146  offset_map_ =
147  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
148  if (recover) {
149  const size_t bytes = file_size(offset_fd_);
150  if (bytes % sizeof(StringIdxEntry) != 0) {
151  LOG(WARNING) << "Offsets " << offsets_path_ << " file is truncated";
152  }
153  const uint64_t str_count = bytes / sizeof(StringIdxEntry);
154  // at this point we know the size of the StringDict we need to load
155  // so lets reallocate the vector to the correct size
156  const uint64_t max_entries = round_up_p2(str_count * 2 + 1);
157  std::vector<int32_t> new_str_ids(max_entries, INVALID_STR_ID);
158  str_ids_.swap(new_str_ids);
159  if (materialize_hashes_) {
160  std::vector<uint32_t> new_rk_hashes(max_entries / 2);
161  rk_hashes_.swap(new_rk_hashes);
162  }
163  unsigned string_id = 0;
164  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
165 
166  uint32_t thread_inits = 0;
167  const auto thread_count = std::thread::hardware_concurrency();
168  const uint32_t items_per_thread = std::max<uint32_t>(
169  2000, std::min<uint32_t>(200000, (str_count / thread_count) + 1));
170  std::vector<std::future<std::vector<std::pair<uint32_t, unsigned int>>>>
171  dictionary_futures;
172  for (string_id = 0; string_id < str_count; string_id += items_per_thread) {
173  dictionary_futures.emplace_back(std::async(
174  std::launch::async, [string_id, str_count, items_per_thread, this] {
175  std::vector<std::pair<uint32_t, unsigned int>> hashVec;
176  for (uint32_t curr_id = string_id;
177  curr_id < string_id + items_per_thread && curr_id < str_count;
178  curr_id++) {
179  const auto recovered = getStringFromStorage(curr_id);
180  if (recovered.canary) {
181  // hit the canary, recovery finished
182  break;
183  } else {
184  std::string temp(recovered.c_str_ptr, recovered.size);
185  hashVec.emplace_back(std::make_pair(rk_hash(temp), temp.size()));
186  }
187  }
188  return hashVec;
189  }));
190  thread_inits++;
191  if (thread_inits % thread_count == 0) {
192  processDictionaryFutures(dictionary_futures);
193  }
194  }
195  // gather last few threads
196  if (dictionary_futures.size() != 0) {
197  processDictionaryFutures(dictionary_futures);
198  }
199  }
200  }
201 }
StringIdxEntry * offset_map_
#define CHECK_EQ(x, y)
Definition: Logger.h:198
#define LOG(tag)
Definition: Logger.h:185
std::string offsets_path_
static constexpr int32_t INVALID_STR_ID
std::shared_ptr< std::vector< std::string > > strings_cache_
void addPayloadCapacity() noexcept
mapd_shared_mutex rw_mutex_
int checked_open(const char *path, const bool recover)
void addOffsetCapacity() noexcept
uint32_t rk_hash(const std::string &str)
void * checked_mmap(const int fd, const size_t sz)
void processDictionaryFutures(std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &dictionary_futures)
const uint64_t round_up_p2(const uint64_t num)
std::vector< int32_t > str_ids_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

StringDictionary::StringDictionary ( const LeafHostInfo host,
const DictRef  dict_ref 
)

Definition at line 222 of file StringDictionary.cpp.

223  : strings_cache_(nullptr)
224  , client_(new StringDictionaryClient(host, dict_ref, true))
225  , client_no_timeout_(new StringDictionaryClient(host, dict_ref, false)) {}
std::unique_ptr< StringDictionaryClient > client_
std::shared_ptr< std::vector< std::string > > strings_cache_
std::unique_ptr< StringDictionaryClient > client_no_timeout_
StringDictionary::~StringDictionary ( )
noexcept

Definition at line 227 of file StringDictionary.cpp.

References CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_munmap(), client_, File_Namespace::close(), isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

227  {
228  if (client_) {
229  return;
230  }
231  if (payload_map_) {
232  if (!isTemp_) {
236  CHECK_GE(payload_fd_, 0);
238  CHECK_GE(offset_fd_, 0);
239  close(offset_fd_);
240  } else {
242  free(payload_map_);
243  free(offset_map_);
244  }
245  }
246 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:203
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:102

+ Here is the call graph for this function:

Member Function Documentation

void * StringDictionary::addMemoryCapacity ( void *  addr,
size_t &  mem_size 
)
privatenoexcept

Definition at line 1036 of file StringDictionary.cpp.

References CHECK(), and anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE.

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1036  {
1037  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1038  if (!CANARY_BUFFER) {
1039  CANARY_BUFFER = reinterpret_cast<char*>(malloc(CANARY_BUFF_SIZE));
1041  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1042  }
1043  void* new_addr = realloc(addr, mem_size + CANARY_BUFF_SIZE);
1044  CHECK(new_addr);
1045  void* write_addr = reinterpret_cast<void*>(static_cast<char*>(new_addr) + mem_size);
1046  CHECK(memcpy(write_addr, CANARY_BUFFER, CANARY_BUFF_SIZE));
1047  mem_size += CANARY_BUFF_SIZE;
1048  return new_addr;
1049 }
CHECK(cgen_state)
static char * CANARY_BUFFER

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::addOffsetCapacity ( )
privatenoexcept

Definition at line 1015 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, offset_fd_, offset_file_size_, and offset_map_.

Referenced by StringDictionary().

1015  {
1016  if (!isTemp_) {
1018  } else {
1019  offset_map_ =
1020  static_cast<StringIdxEntry*>(addMemoryCapacity(offset_map_, offset_file_size_));
1021  }
1022 }
StringIdxEntry * offset_map_
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::addPayloadCapacity ( )
privatenoexcept

Definition at line 1006 of file StringDictionary.cpp.

References addMemoryCapacity(), addStorageCapacity(), isTemp_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by StringDictionary().

1006  {
1007  if (!isTemp_) {
1009  } else {
1010  payload_map_ =
1011  static_cast<char*>(addMemoryCapacity(payload_map_, payload_file_size_));
1012  }
1013 }
void * addMemoryCapacity(void *addr, size_t &mem_size) noexcept
size_t addStorageCapacity(int fd) noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::addStorageCapacity ( int  fd)
privatenoexcept

Definition at line 1024 of file StringDictionary.cpp.

References CHECK(), CHECK_NE, anonymous_namespace{StringDictionary.cpp}::SYSTEM_PAGE_SIZE, and File_Namespace::write().

Referenced by addOffsetCapacity(), and addPayloadCapacity().

1024  {
1025  static const ssize_t CANARY_BUFF_SIZE = 1024 * SYSTEM_PAGE_SIZE;
1026  if (!CANARY_BUFFER) {
1027  CANARY_BUFFER = static_cast<char*>(malloc(CANARY_BUFF_SIZE));
1029  memset(CANARY_BUFFER, 0xff, CANARY_BUFF_SIZE);
1030  }
1031  CHECK_NE(lseek(fd, 0, SEEK_END), -1);
1032  CHECK(write(fd, CANARY_BUFFER, CANARY_BUFF_SIZE) == CANARY_BUFF_SIZE);
1033  return CANARY_BUFF_SIZE;
1034 }
CHECK(cgen_state)
#define CHECK_NE(x, y)
Definition: Logger.h:199
static char * CANARY_BUFFER
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:121

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::appendToStorage ( const std::string &  str)
privatenoexcept

Definition at line 955 of file StringDictionary.cpp.

References CHECK(), CHECK_GE, anonymous_namespace{StringDictionary.cpp}::checked_mmap(), anonymous_namespace{StringDictionary.cpp}::checked_munmap(), and StringDictionary::StringIdxEntry::size.

Referenced by getOrAddBulk().

955  {
956  if (!isTemp_) {
957  CHECK_GE(payload_fd_, 0);
958  CHECK_GE(offset_fd_, 0);
959  }
960  // write the payload
961  if (payload_file_off_ + str.size() > payload_file_size_) {
962  if (!isTemp_) {
965  CHECK(payload_file_off_ + str.size() <= payload_file_size_);
966  payload_map_ =
967  reinterpret_cast<char*>(checked_mmap(payload_fd_, payload_file_size_));
968  } else {
970  }
971  }
972  memcpy(payload_map_ + payload_file_off_, str.c_str(), str.size());
973  // write the offset and length
974  size_t offset_file_off = str_count_ * sizeof(StringIdxEntry);
975  StringIdxEntry str_meta{static_cast<uint64_t>(payload_file_off_), str.size()};
976  payload_file_off_ += str.size();
977  if (offset_file_off + sizeof(str_meta) >= offset_file_size_) {
978  if (!isTemp_) {
981  CHECK(offset_file_off + sizeof(str_meta) <= offset_file_size_);
982  offset_map_ =
983  reinterpret_cast<StringIdxEntry*>(checked_mmap(offset_fd_, offset_file_size_));
984  } else {
986  }
987  }
988  memcpy(offset_map_ + str_count_, &str_meta, sizeof(str_meta));
989 }
StringIdxEntry * offset_map_
void checked_munmap(void *addr, size_t length)
#define CHECK_GE(x, y)
Definition: Logger.h:203
CHECK(cgen_state)
void addPayloadCapacity() noexcept
void addOffsetCapacity() noexcept
void * checked_mmap(const int fd, const size_t sz)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

compare_cache_value_t* StringDictionary::binary_search_cache ( const std::string &  pattern) const
private
void StringDictionary::buildSortedCache ( )
private

Definition at line 1083 of file StringDictionary.cpp.

References mergeSortedCache(), sortCache(), sorted_cache, and str_count_.

Referenced by getCompare().

1083  {
1084  // This method is not thread-safe.
1085  const auto cur_cache_size = sorted_cache.size();
1086  std::vector<int32_t> temp_sorted_cache;
1087  for (size_t i = cur_cache_size; i < str_count_; i++) {
1088  temp_sorted_cache.push_back(i);
1089  }
1090  sortCache(temp_sorted_cache);
1091  mergeSortedCache(temp_sorted_cache);
1092 }
void mergeSortedCache(std::vector< int32_t > &temp_sorted_cache)
void sortCache(std::vector< int32_t > &cache)
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool StringDictionary::checkpoint ( )
noexcept

Definition at line 1066 of file StringDictionary.cpp.

References CHECK(), client_, isTemp_, offset_fd_, offset_file_size_, offset_map_, payload_fd_, payload_file_size_, and payload_map_.

Referenced by Importer_NS::TypedImportBuffer::stringDictCheckpoint().

1066  {
1067  if (client_) {
1068  try {
1069  return client_->checkpoint();
1070  } catch (...) {
1071  return false;
1072  }
1073  }
1074  CHECK(!isTemp_);
1075  bool ret = true;
1076  ret = ret && (msync((void*)offset_map_, offset_file_size_, MS_SYNC) == 0);
1077  ret = ret && (msync((void*)payload_map_, payload_file_size_, MS_SYNC) == 0);
1078  ret = ret && (fsync(offset_fd_) == 0);
1079  ret = ret && (fsync(payload_fd_) == 0);
1080  return ret;
1081 }
StringIdxEntry * offset_map_
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeBucket ( const uint32_t  hash,
const std::string  str,
const std::vector< int32_t > &  data,
const bool  unique 
) const
privatenoexcept

Definition at line 898 of file StringDictionary.cpp.

Referenced by getOrAddBulk().

901  {
902  auto bucket = hash & (data.size() - 1);
903  while (true) {
904  if (data[bucket] ==
905  INVALID_STR_ID) { // In this case it means the slot is available for use
906  break;
907  }
908  // if records are unique I don't need to do this test as I know it will not be the
909  // same
910  if (!unique) {
911  if (materialize_hashes_) {
912  if (hash == rk_hashes_[data[bucket]]) {
913  // can't be the same string if hash is different
914  const auto old_str = getStringFromStorage(data[bucket]);
915  if (str.size() == old_str.size &&
916  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
917  // found the string
918  break;
919  }
920  }
921  } else {
922  const auto old_str = getStringFromStorage(data[bucket]);
923  if (str.size() == old_str.size &&
924  !memcmp(str.c_str(), old_str.c_str_ptr, str.size())) {
925  // found the string
926  break;
927  }
928  }
929  }
930  // wrap around
931  if (++bucket == data.size()) {
932  bucket = 0;
933  }
934  }
935  return bucket;
936 }
static constexpr int32_t INVALID_STR_ID
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< uint32_t > rk_hashes_

+ Here is the caller graph for this function:

uint32_t StringDictionary::computeUniqueBucketWithHash ( const uint32_t  hash,
const std::vector< int32_t > &  data 
) const
privatenoexcept

Definition at line 938 of file StringDictionary.cpp.

Referenced by increaseCapacity(), and processDictionaryFutures().

940  {
941  auto bucket = hash & (data.size() - 1);
942  while (true) {
943  if (data[bucket] ==
944  INVALID_STR_ID) { // In this case it means the slot is available for use
945  break;
946  }
947  // wrap around
948  if (++bucket == data.size()) {
949  bucket = 0;
950  }
951  }
952  return bucket;
953 }
static constexpr int32_t INVALID_STR_ID

+ Here is the caller graph for this function:

std::shared_ptr< const std::vector< std::string > > StringDictionary::copyStrings ( ) const

Definition at line 765 of file StringDictionary.cpp.

References CHECK_EQ, CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), rw_mutex_, str_count_, and strings_cache_.

765  {
766  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
767  if (client_) {
768  // TODO(miyu): support remote string dictionary
769  throw std::runtime_error(
770  "copying dictionaries from remote server is not supported yet.");
771  }
772 
773  if (strings_cache_) {
774  return strings_cache_;
775  }
776 
777  strings_cache_ = std::make_shared<std::vector<std::string>>();
778  strings_cache_->reserve(str_count_);
779  const bool multithreaded = str_count_ > 10000;
780  const auto worker_count =
781  multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
782  CHECK_GT(worker_count, 0UL);
783  std::vector<std::vector<std::string>> worker_results(worker_count);
784  auto copy = [this](std::vector<std::string>& str_list,
785  const size_t start_id,
786  const size_t end_id) {
787  CHECK_LE(start_id, end_id);
788  str_list.reserve(end_id - start_id);
789  for (size_t string_id = start_id; string_id < end_id; ++string_id) {
790  str_list.push_back(getStringUnlocked(string_id));
791  }
792  };
793  if (multithreaded) {
794  std::vector<std::future<void>> workers;
795  const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
796  for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
797  worker_idx < worker_count && start < str_count_;
798  ++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
799  workers.push_back(std::async(
800  std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
801  }
802  for (auto& worker : workers) {
803  worker.get();
804  }
805  } else {
806  CHECK_EQ(worker_results.size(), size_t(1));
807  copy(worker_results[0], 0, str_count_);
808  }
809 
810  for (const auto& worker_result : worker_results) {
811  strings_cache_->insert(
812  strings_cache_->end(), worker_result.begin(), worker_result.end());
813  }
814  return strings_cache_;
815 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
std::shared_ptr< std::vector< std::string > > strings_cache_
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:201
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

bool StringDictionary::fillRateIsHigh ( ) const
privatenoexcept

Definition at line 817 of file StringDictionary.cpp.

References str_count_, and str_ids_.

Referenced by getOrAddBulk().

817  {
818  return str_ids_.size() < str_count_ * 2;
819 }
std::vector< int32_t > str_ids_

+ Here is the caller graph for this function:

std::vector< int32_t > StringDictionary::getCompare ( const std::string &  pattern,
const std::string &  comp_operator,
const size_t  generation 
)

Definition at line 557 of file StringDictionary.cpp.

References buildSortedCache(), client_, compare_cache_, getEquals(), getStringFromStorage(), rw_mutex_, sorted_cache, str_count_, string_eq(), and string_lt().

559  {
560  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
561  if (client_) {
562  return client_->get_compare(pattern, comp_operator, generation);
563  }
564  std::vector<int32_t> ret;
565  if (str_count_ == 0) {
566  return ret;
567  }
568  if (sorted_cache.size() < str_count_) {
569  if (comp_operator == "=" || comp_operator == "<>") {
570  return getEquals(pattern, comp_operator, generation);
571  }
572 
574  }
575  auto cache_index = compare_cache_.get(pattern);
576 
577  if (!cache_index) {
578  cache_index = std::make_shared<StringDictionary::compare_cache_value_t>();
579  const auto cache_itr = std::lower_bound(
580  sorted_cache.begin(),
581  sorted_cache.end(),
582  pattern,
583  [this](decltype(sorted_cache)::value_type const& a, decltype(pattern)& b) {
584  auto a_str = this->getStringFromStorage(a);
585  return string_lt(a_str.c_str_ptr, a_str.size, b.c_str(), b.size());
586  });
587 
588  if (cache_itr == sorted_cache.end()) {
589  cache_index->index = sorted_cache.size() - 1;
590  cache_index->diff = 1;
591  } else {
592  const auto cache_str = getStringFromStorage(*cache_itr);
593  if (!string_eq(
594  cache_str.c_str_ptr, cache_str.size, pattern.c_str(), pattern.size())) {
595  cache_index->index = cache_itr - sorted_cache.begin() - 1;
596  cache_index->diff = 1;
597  } else {
598  cache_index->index = cache_itr - sorted_cache.begin();
599  cache_index->diff = 0;
600  }
601  }
602 
603  compare_cache_.put(pattern, cache_index);
604  }
605 
606  // since we have a cache in form of vector of ints which is sorted according to
607  // corresponding strings in the dictionary all we need is the index of the element
608  // which equal to the pattern that we are trying to match or the index of “biggest”
609  // element smaller than the pattern, to perform all the comparison operators over
610  // string. The search function guarantees we have such index so now it is just the
611  // matter to include all the elements in the result vector.
612 
613  // For < operator if the index that we have points to the element which is equal to
614  // the pattern that we are searching for we simply get all the elements less than the
615  // index. If the element pointed by the index is not equal to the pattern we are
616  // comparing with we also need to include that index in result vector, except when the
617  // index points to 0 and the pattern is lesser than the smallest value in the string
618  // dictionary.
619 
620  if (comp_operator == "<") {
621  size_t idx = cache_index->index;
622  if (cache_index->diff) {
623  idx = cache_index->index + 1;
624  if (cache_index->index == 0 && cache_index->diff > 0) {
625  idx = cache_index->index;
626  }
627  }
628  for (size_t i = 0; i < idx; i++) {
629  ret.push_back(sorted_cache[i]);
630  }
631 
632  // For <= operator if the index that we have points to the element which is equal to
633  // the pattern that we are searching for we want to include the element pointed by
634  // the index in the result set. If the element pointed by the index is not equal to
635  // the pattern we are comparing with we just want to include all the ids with index
636  // less than the index that is cached, except when pattern that we are searching for
637  // is smaller than the smallest string in the dictionary.
638 
639  } else if (comp_operator == "<=") {
640  size_t idx = cache_index->index + 1;
641  if (cache_index == 0 && cache_index->diff > 0) {
642  idx = cache_index->index;
643  }
644  for (size_t i = 0; i < idx; i++) {
645  ret.push_back(sorted_cache[i]);
646  }
647 
648  // For > operator we want to get all the elements with index greater than the index
649  // that we have except, when the pattern we are searching for is lesser than the
650  // smallest string in the dictionary we also want to include the id of the index
651  // that we have.
652 
653  } else if (comp_operator == ">") {
654  size_t idx = cache_index->index + 1;
655  if (cache_index->index == 0 && cache_index->diff > 0) {
656  idx = cache_index->index;
657  }
658  for (size_t i = idx; i < sorted_cache.size(); i++) {
659  ret.push_back(sorted_cache[i]);
660  }
661 
662  // For >= operator when the indexed element that we have points to element which is
663  // equal to the pattern we are searching for we want to include that in the result
664  // vector. If the index that we have does not point to the string which is equal to
665  // the pattern we are searching we don’t want to include that id into the result
666  // vector except when the index is 0.
667 
668  } else if (comp_operator == ">=") {
669  size_t idx = cache_index->index;
670  if (cache_index->diff) {
671  idx = cache_index->index + 1;
672  if (cache_index->index == 0 && cache_index->diff > 0) {
673  idx = cache_index->index;
674  }
675  }
676  for (size_t i = idx; i < sorted_cache.size(); i++) {
677  ret.push_back(sorted_cache[i]);
678  }
679  } else if (comp_operator == "=") {
680  if (!cache_index->diff) {
681  ret.push_back(sorted_cache[cache_index->index]);
682  }
683 
684  // For <> operator it is simple matter of not including id of string which is equal
685  // to pattern we are searching for.
686  } else if (comp_operator == "<>") {
687  if (!cache_index->diff) {
688  size_t idx = cache_index->index;
689  for (size_t i = 0; i < idx; i++) {
690  ret.push_back(sorted_cache[i]);
691  }
692  ++idx;
693  for (size_t i = idx; i < sorted_cache.size(); i++) {
694  ret.push_back(sorted_cache[i]);
695  }
696  } else {
697  for (size_t i = 0; i < sorted_cache.size(); i++) {
698  ret.insert(ret.begin(), sorted_cache.begin(), sorted_cache.end());
699  }
700  }
701 
702  } else {
703  std::runtime_error("Unsupported string comparison operator");
704  }
705  return ret;
706 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
DEVICE bool string_eq(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:332
std::unique_ptr< StringDictionaryClient > client_
std::vector< int32_t > getEquals(std::string pattern, std::string comp_operator, size_t generation)
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
mapd_shared_mutex rw_mutex_
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getEquals ( std::string  pattern,
std::string  comp_operator,
size_t  generation 
)
private

Definition at line 497 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, cpu_threads(), equal_cache_, getStringUnlocked(), MAX_STRLEN, run_benchmark_import::result, and str_count_.

Referenced by getCompare().

499  {
500  std::vector<int32_t> result;
501  auto eq_id_itr = equal_cache_.find(pattern);
502  int32_t eq_id = MAX_STRLEN + 1;
503  int32_t cur_size = str_count_;
504  if (eq_id_itr != equal_cache_.end()) {
505  auto eq_id = eq_id_itr->second;
506  if (comp_operator == "=") {
507  result.push_back(eq_id);
508  } else {
509  for (int32_t idx = 0; idx <= cur_size; idx++) {
510  if (idx == eq_id) {
511  continue;
512  }
513  result.push_back(idx);
514  }
515  }
516  } else {
517  std::vector<std::thread> workers;
518  int worker_count = cpu_threads();
519  CHECK_GT(worker_count, 0);
520  std::vector<std::vector<int32_t>> worker_results(worker_count);
521  CHECK_LE(generation, str_count_);
522  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
523  workers.emplace_back(
524  [&worker_results, &pattern, generation, worker_idx, worker_count, this]() {
525  for (size_t string_id = worker_idx; string_id < generation;
526  string_id += worker_count) {
527  const auto str = getStringUnlocked(string_id);
528  if (str == pattern) {
529  worker_results[worker_idx].push_back(string_id);
530  }
531  }
532  });
533  }
534  for (auto& worker : workers) {
535  worker.join();
536  }
537  for (const auto& worker_result : worker_results) {
538  result.insert(result.end(), worker_result.begin(), worker_result.end());
539  }
540  if (result.size() > 0) {
541  const auto it_ok = equal_cache_.insert(std::make_pair(pattern, result[0]));
542  CHECK(it_ok.second);
543  eq_id = result[0];
544  }
545  if (comp_operator == "<>") {
546  for (int32_t idx = 0; idx <= cur_size; idx++) {
547  if (idx == eq_id) {
548  continue;
549  }
550  result.push_back(idx);
551  }
552  }
553  }
554  return result;
555 }
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
std::map< std::string, int32_t > equal_cache_
#define CHECK_LE(x, y)
Definition: Logger.h:201
static constexpr size_t MAX_STRLEN
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getIdOfString ( const std::string &  str) const

Definition at line 372 of file StringDictionary.cpp.

References client_, getUnlocked(), and rw_mutex_.

372  {
373  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
374  if (client_) {
375  return client_->get(str);
376  }
377  return getUnlocked(str);
378 }
int32_t getUnlocked(const std::string &str) const noexcept
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getLike ( const std::string &  pattern,
const bool  icase,
const bool  is_simple,
const char  escape,
const size_t  generation 
) const

Definition at line 444 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_like(), like_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

448  {
449  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
450  if (client_) {
451  return client_->get_like(pattern, icase, is_simple, escape, generation);
452  }
453  const auto cache_key = std::make_tuple(pattern, icase, is_simple, escape);
454  const auto it = like_cache_.find(cache_key);
455  if (it != like_cache_.end()) {
456  return it->second;
457  }
458  std::vector<int32_t> result;
459  std::vector<std::thread> workers;
460  int worker_count = cpu_threads();
461  CHECK_GT(worker_count, 0);
462  std::vector<std::vector<int32_t>> worker_results(worker_count);
463  CHECK_LE(generation, str_count_);
464  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
465  workers.emplace_back([&worker_results,
466  &pattern,
467  generation,
468  icase,
469  is_simple,
470  escape,
471  worker_idx,
472  worker_count,
473  this]() {
474  for (size_t string_id = worker_idx; string_id < generation;
475  string_id += worker_count) {
476  const auto str = getStringUnlocked(string_id);
477  if (is_like(str, pattern, icase, is_simple, escape)) {
478  worker_results[worker_idx].push_back(string_id);
479  }
480  }
481  });
482  }
483  for (auto& worker : workers) {
484  worker.join();
485  }
486  for (const auto& worker_result : worker_results) {
487  result.insert(result.end(), worker_result.begin(), worker_result.end());
488  }
489  // place result into cache for reuse if similar query
490  const auto it_ok = like_cache_.insert(std::make_pair(cache_key, result));
491 
492  CHECK(it_ok.second);
493 
494  return result;
495 }
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
bool is_like(const std::string &str, const std::string &pattern, const bool icase, const bool is_simple, const char escape)
#define CHECK_LE(x, y)
Definition: Logger.h:201
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

int32_t StringDictionary::getOrAdd ( const std::string &  str)
noexcept

Definition at line 248 of file StringDictionary.cpp.

References CHECK_EQ.

248  {
249  if (client_) {
250  std::vector<int32_t> string_ids;
251  client_->get_or_add_bulk(string_ids, {str});
252  CHECK_EQ(size_t(1), string_ids.size());
253  return string_ids.front();
254  }
255  return getOrAddImpl(str);
256 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
int32_t getOrAddImpl(const std::string &str) noexcept
std::unique_ptr< StringDictionaryClient > client_
template<class T >
template void StringDictionary::getOrAddBulk ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)

Definition at line 282 of file StringDictionary.cpp.

References appendToStorage(), CHECK(), CHECK_LT, client_no_timeout_, computeBucket(), fillRateIsHigh(), getOrAddBulkRemote(), increaseCapacity(), INVALID_STR_ID, invalidateInvertedIndex(), materialize_hashes_, MAX_STRCOUNT, MAX_STRLEN, offsets_path_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, rw_mutex_, str_count_, and str_ids_.

Referenced by Importer_NS::TypedImportBuffer::addDictEncodedString(), getOrAddBulkArray(), and populate_string_ids().

283  {
284  if (client_no_timeout_) {
285  getOrAddBulkRemote(string_vec, encoded_vec);
286  return;
287  }
288  size_t out_idx{0};
289  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
290 
291  for (const auto& str : string_vec) {
292  if (str.empty()) {
293  encoded_vec[out_idx++] = inline_int_null_value<T>();
294  continue;
295  }
296  CHECK(str.size() <= MAX_STRLEN);
297  uint32_t bucket;
298  const uint32_t hash = rk_hash(str);
299  bucket = computeBucket(hash, str, str_ids_, false);
300  if (str_ids_[bucket] != INVALID_STR_ID) {
301  encoded_vec[out_idx++] = str_ids_[bucket];
302  continue;
303  }
304  // need to add record to dictionary
305  // check there is room
306  if (str_count_ == static_cast<size_t>(max_valid_int_value<T>())) {
307  log_encoding_error<T>(str);
308  encoded_vec[out_idx++] = inline_int_null_value<T>();
309  continue;
310  }
311  if (str_ids_[bucket] == INVALID_STR_ID) {
313  << "Maximum number (" << str_count_
314  << ") of Dictionary encoded Strings reached for this column, offset path "
315  "for column is "
316  << offsets_path_;
317  if (fillRateIsHigh()) {
318  // resize when more than 50% is full
320  bucket = computeBucket(hash, str, str_ids_, false);
321  }
322  appendToStorage(str);
323 
324  str_ids_[bucket] = static_cast<int32_t>(str_count_);
325  if (materialize_hashes_) {
326  rk_hashes_[str_count_] = hash;
327  }
328  ++str_count_;
329  }
330  encoded_vec[out_idx++] = str_ids_[bucket];
331  }
333 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:200
std::unique_ptr< StringDictionaryClient > client_no_timeout_
void getOrAddBulkRemote(const std::vector< std::string > &string_vec, T *encoded_vec)
void invalidateInvertedIndex() noexcept
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::getOrAddBulkArray ( const std::vector< std::vector< std::string >> &  string_array_vec,
std::vector< std::vector< int32_t >> &  ids_array_vec 
)

Definition at line 269 of file StringDictionary.cpp.

References getOrAddBulk().

Referenced by Importer_NS::TypedImportBuffer::addDictEncodedStringArray().

271  {
272  ids_array_vec.resize(string_array_vec.size());
273  for (size_t i = 0; i < string_array_vec.size(); i++) {
274  auto& strings = string_array_vec[i];
275  auto& ids = ids_array_vec[i];
276  ids.resize(strings.size());
277  getOrAddBulk(strings, &ids[0]);
278  }
279 }
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<class T >
template void StringDictionary::getOrAddBulkRemote ( const std::vector< std::string > &  string_vec,
T *  encoded_vec 
)
private

Definition at line 342 of file StringDictionary.cpp.

References CHECK(), and client_no_timeout_.

Referenced by getOrAddBulk().

343  {
345  std::vector<int32_t> string_ids;
346  client_no_timeout_->get_or_add_bulk(string_ids, string_vec);
347  size_t out_idx{0};
348  for (size_t i = 0; i < string_ids.size(); ++i) {
349  const auto string_id = string_ids[i];
350  const bool invalid = string_id > max_valid_int_value<T>();
351  if (invalid || string_id == inline_int_null_value<int32_t>()) {
352  if (invalid) {
353  log_encoding_error<T>(string_vec[i]);
354  }
355  encoded_vec[out_idx++] = inline_int_null_value<T>();
356  continue;
357  }
358  encoded_vec[out_idx++] = string_id;
359  }
360 }
CHECK(cgen_state)
std::unique_ptr< StringDictionaryClient > client_no_timeout_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int32_t StringDictionary::getOrAddImpl ( const std::string &  str)
privatenoexcept

Definition at line 844 of file StringDictionary.cpp.

References CHECK(), CHECK_LT, and anonymous_namespace{StringDictionary.cpp}::rk_hash().

844  {
845  // @TODO(wei) treat empty string as NULL for now
846  if (str.size() == 0) {
847  return inline_int_null_value<int32_t>();
848  }
849  CHECK(str.size() <= MAX_STRLEN);
850  uint32_t bucket;
851  const uint32_t hash = rk_hash(str);
852  {
853  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
854  bucket = computeBucket(hash, str, str_ids_, false);
855  if (str_ids_[bucket] != INVALID_STR_ID) {
856  return str_ids_[bucket];
857  }
858  }
859  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
860  // need to recalculate the bucket in case it changed before
861  // we got the lock
862  bucket = computeBucket(hash, str, str_ids_, false);
863  if (str_ids_[bucket] == INVALID_STR_ID) {
865  << "Maximum number (" << str_count_
866  << ") of Dictionary encoded Strings reached for this column, offset path "
867  "for column is "
868  << offsets_path_;
869  if (fillRateIsHigh()) {
870  // resize when more than 50% is full
872  bucket = computeBucket(hash, str, str_ids_, false);
873  }
874  appendToStorage(str);
875  str_ids_[bucket] = static_cast<int32_t>(str_count_);
876  if (materialize_hashes_) {
877  rk_hashes_[str_count_] = hash;
878  }
879  ++str_count_;
881  }
882  return str_ids_[bucket];
883 }
std::string offsets_path_
void increaseCapacity() noexcept
static constexpr size_t MAX_STRCOUNT
static constexpr int32_t INVALID_STR_ID
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
uint32_t rk_hash(const std::string &str)
#define CHECK_LT(x, y)
Definition: Logger.h:200
void invalidateInvertedIndex() noexcept
std::vector< int32_t > str_ids_
static constexpr size_t MAX_STRLEN
void appendToStorage(const std::string &str) noexcept
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept
std::vector< uint32_t > rk_hashes_
bool fillRateIsHigh() const noexcept

+ Here is the call graph for this function:

std::vector< int32_t > StringDictionary::getRegexpLike ( const std::string &  pattern,
const char  escape,
const size_t  generation 
) const

Definition at line 718 of file StringDictionary.cpp.

References CHECK(), CHECK_GT, CHECK_LE, client_, cpu_threads(), getStringUnlocked(), anonymous_namespace{StringDictionary.cpp}::is_regexp_like(), regex_cache_, run_benchmark_import::result, rw_mutex_, and str_count_.

720  {
721  mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
722  if (client_) {
723  return client_->get_regexp_like(pattern, escape, generation);
724  }
725  const auto cache_key = std::make_pair(pattern, escape);
726  const auto it = regex_cache_.find(cache_key);
727  if (it != regex_cache_.end()) {
728  return it->second;
729  }
730  std::vector<int32_t> result;
731  std::vector<std::thread> workers;
732  int worker_count = cpu_threads();
733  CHECK_GT(worker_count, 0);
734  std::vector<std::vector<int32_t>> worker_results(worker_count);
735  CHECK_LE(generation, str_count_);
736  for (int worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
737  workers.emplace_back([&worker_results,
738  &pattern,
739  generation,
740  escape,
741  worker_idx,
742  worker_count,
743  this]() {
744  for (size_t string_id = worker_idx; string_id < generation;
745  string_id += worker_count) {
746  const auto str = getStringUnlocked(string_id);
747  if (is_regexp_like(str, pattern, escape)) {
748  worker_results[worker_idx].push_back(string_id);
749  }
750  }
751  });
752  }
753  for (auto& worker : workers) {
754  worker.join();
755  }
756  for (const auto& worker_result : worker_results) {
757  result.insert(result.end(), worker_result.begin(), worker_result.end());
758  }
759  const auto it_ok = regex_cache_.insert(std::make_pair(cache_key, result));
760  CHECK(it_ok.second);
761 
762  return result;
763 }
#define CHECK_GT(x, y)
Definition: Logger.h:202
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
bool is_regexp_like(const std::string &str, const std::string &pattern, const char escape)
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LE(x, y)
Definition: Logger.h:201
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

std::string StringDictionary::getString ( int32_t  string_id) const

Definition at line 386 of file StringDictionary.cpp.

References client_, getStringUnlocked(), and rw_mutex_.

Referenced by StringValueConverter::convertToColumnarFormatFromDict(), and populate_string_ids().

386  {
387  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
388  if (client_) {
389  std::string ret;
390  client_->get_string(ret, string_id);
391  return ret;
392  }
393  return getStringUnlocked(string_id);
394 }
std::unique_ptr< StringDictionaryClient > client_
std::string getStringUnlocked(int32_t string_id) const noexcept
mapd_shared_mutex rw_mutex_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytes ( int32_t  string_id) const
noexcept

Definition at line 401 of file StringDictionary.cpp.

References CHECK(), CHECK_LE, and CHECK_LT.

402  {
403  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
404  CHECK(!client_);
405  CHECK_LE(0, string_id);
406  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
407  return getStringBytesChecked(string_id);
408 }
std::pair< char *, size_t > getStringBytesChecked(const int string_id) const noexcept
std::unique_ptr< StringDictionaryClient > client_
CHECK(cgen_state)
mapd_shared_mutex rw_mutex_
#define CHECK_LT(x, y)
Definition: Logger.h:200
#define CHECK_LE(x, y)
Definition: Logger.h:201

+ Here is the call graph for this function:

std::pair< char *, size_t > StringDictionary::getStringBytesChecked ( const int  string_id) const
privatenoexcept

Definition at line 891 of file StringDictionary.cpp.

References CHECK().

892  {
893  const auto str_canary = getStringFromStorage(string_id);
894  CHECK(!str_canary.canary);
895  return std::make_pair(str_canary.c_str_ptr, str_canary.size);
896 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

std::string StringDictionary::getStringChecked ( const int  string_id) const
privatenoexcept

Definition at line 885 of file StringDictionary.cpp.

References CHECK().

Referenced by increaseCapacity().

885  {
886  const auto str_canary = getStringFromStorage(string_id);
887  CHECK(!str_canary.canary);
888  return std::string(str_canary.c_str_ptr, str_canary.size);
889 }
CHECK(cgen_state)
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

StringDictionary::PayloadString StringDictionary::getStringFromStorage ( const int  string_id) const
privatenoexcept

Definition at line 991 of file StringDictionary.cpp.

References CHECK_GE, StringDictionary::StringIdxEntry::off, and StringDictionary::StringIdxEntry::size.

Referenced by getCompare(), mergeSortedCache(), sortCache(), and StringDictionary().

992  {
993  if (!isTemp_) {
994  CHECK_GE(payload_fd_, 0);
995  CHECK_GE(offset_fd_, 0);
996  }
997  CHECK_GE(string_id, 0);
998  const StringIdxEntry* str_meta = offset_map_ + string_id;
999  if (str_meta->size == 0xffff) {
1000  // hit the canary
1001  return {nullptr, 0, true};
1002  }
1003  return {payload_map_ + str_meta->off, str_meta->size, false};
1004 }
StringIdxEntry * offset_map_
#define CHECK_GE(x, y)
Definition: Logger.h:203

+ Here is the caller graph for this function:

std::string StringDictionary::getStringUnlocked ( int32_t  string_id) const
privatenoexcept

Definition at line 396 of file StringDictionary.cpp.

References CHECK_LT.

Referenced by copyStrings(), getEquals(), getLike(), getRegexpLike(), and getString().

396  {
397  CHECK_LT(string_id, static_cast<int32_t>(str_count_));
398  return getStringChecked(string_id);
399 }
std::string getStringChecked(const int string_id) const noexcept
#define CHECK_LT(x, y)
Definition: Logger.h:200

+ Here is the caller graph for this function:

int32_t StringDictionary::getUnlocked ( const std::string &  str) const
privatenoexcept

Definition at line 380 of file StringDictionary.cpp.

References anonymous_namespace{StringDictionary.cpp}::rk_hash().

Referenced by getIdOfString().

380  {
381  const uint32_t hash = rk_hash(str);
382  auto str_id = str_ids_[computeBucket(hash, str, str_ids_, false)];
383  return str_id;
384 }
uint32_t rk_hash(const std::string &str)
std::vector< int32_t > str_ids_
uint32_t computeBucket(const uint32_t hash, const std::string str, const std::vector< int32_t > &data, const bool unique) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::increaseCapacity ( )
privatenoexcept

Definition at line 821 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), getStringChecked(), INVALID_STR_ID, materialize_hashes_, anonymous_namespace{StringDictionary.cpp}::rk_hash(), rk_hashes_, str_count_, and str_ids_.

Referenced by getOrAddBulk().

821  {
822  std::vector<int32_t> new_str_ids(str_ids_.size() * 2, INVALID_STR_ID);
823 
824  if (materialize_hashes_) {
825  for (size_t i = 0; i < str_ids_.size(); ++i) {
826  if (str_ids_[i] != INVALID_STR_ID) {
827  const uint32_t hash = rk_hashes_[str_ids_[i]];
828  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
829  new_str_ids[bucket] = str_ids_[i];
830  }
831  }
832  rk_hashes_.resize(rk_hashes_.size() * 2);
833  } else {
834  for (size_t i = 0; i < str_count_; ++i) {
835  const auto str = getStringChecked(i);
836  const uint32_t hash = rk_hash(str);
837  uint32_t bucket = computeUniqueBucketWithHash(hash, new_str_ids);
838  new_str_ids[bucket] = i;
839  }
840  }
841  str_ids_.swap(new_str_ids);
842 }
std::string getStringChecked(const int string_id) const noexcept
static constexpr int32_t INVALID_STR_ID
uint32_t rk_hash(const std::string &str)
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::insertInSortedCache ( std::string  str,
int32_t  str_id 
)
private
void StringDictionary::invalidateInvertedIndex ( )
privatenoexcept

Definition at line 1051 of file StringDictionary.cpp.

References compare_cache_, equal_cache_, like_cache_, and regex_cache_.

Referenced by getOrAddBulk().

1051  {
1052  if (!like_cache_.empty()) {
1053  decltype(like_cache_)().swap(like_cache_);
1054  }
1055  if (!regex_cache_.empty()) {
1056  decltype(regex_cache_)().swap(regex_cache_);
1057  }
1058  if (!equal_cache_.empty()) {
1059  decltype(equal_cache_)().swap(equal_cache_);
1060  }
1061  compare_cache_.invalidateInvertedIndex();
1062 }
DictionaryCache< std::string, compare_cache_value_t > compare_cache_
std::map< std::pair< std::string, char >, std::vector< int32_t > > regex_cache_
std::map< std::string, int32_t > equal_cache_
std::map< std::tuple< std::string, bool, bool, char >, std::vector< int32_t > > like_cache_

+ Here is the caller graph for this function:

void StringDictionary::mergeSortedCache ( std::vector< int32_t > &  temp_sorted_cache)
private

Definition at line 1107 of file StringDictionary.cpp.

References getStringFromStorage(), sorted_cache, and string_lt().

Referenced by buildSortedCache().

1107  {
1108  // this method is not thread safe
1109  std::vector<int32_t> updated_cache(temp_sorted_cache.size() + sorted_cache.size());
1110  size_t t_idx = 0, s_idx = 0, idx = 0;
1111  for (; t_idx < temp_sorted_cache.size() && s_idx < sorted_cache.size(); idx++) {
1112  auto t_string = getStringFromStorage(temp_sorted_cache[t_idx]);
1113  auto s_string = getStringFromStorage(sorted_cache[s_idx]);
1114  const auto insert_from_temp_cache =
1115  string_lt(t_string.c_str_ptr, t_string.size, s_string.c_str_ptr, s_string.size);
1116  if (insert_from_temp_cache) {
1117  updated_cache[idx] = temp_sorted_cache[t_idx++];
1118  } else {
1119  updated_cache[idx] = sorted_cache[s_idx++];
1120  }
1121  }
1122  while (t_idx < temp_sorted_cache.size()) {
1123  updated_cache[idx++] = temp_sorted_cache[t_idx++];
1124  }
1125  while (s_idx < sorted_cache.size()) {
1126  updated_cache[idx++] = sorted_cache[s_idx++];
1127  }
1128  sorted_cache.swap(updated_cache);
1129 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept
std::vector< int32_t > sorted_cache

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_array_ids ( std::vector< std::vector< int32_t >> &  dest_array_ids,
StringDictionary dest_dict,
const std::vector< std::vector< int32_t >> &  source_array_ids,
const StringDictionary source_dict 
)
static

Definition at line 1158 of file StringDictionary.cpp.

References populate_string_ids().

Referenced by DictionaryValueConverter< TARGET_TYPE >::processArrayBuffer().

1162  {
1163  dest_array_ids.resize(source_array_ids.size());
1164 
1165  std::atomic<size_t> row_idx{0};
1166  auto processor = [&row_idx, &dest_array_ids, dest_dict, &source_array_ids, source_dict](
1167  int thread_id) {
1168  for (;;) {
1169  auto row = row_idx.fetch_add(1);
1170 
1171  if (row >= dest_array_ids.size()) {
1172  return;
1173  }
1174  const auto& source_ids = source_array_ids[row];
1175  auto& dest_ids = dest_array_ids[row];
1176  populate_string_ids(dest_ids, dest_dict, source_ids, source_dict);
1177  }
1178  };
1179 
1180  const int num_worker_threads = std::thread::hardware_concurrency();
1181 
1182  if (source_array_ids.size() / num_worker_threads > 10) {
1183  std::vector<std::future<void>> worker_threads;
1184  for (int i = 0; i < num_worker_threads; ++i) {
1185  worker_threads.push_back(std::async(std::launch::async, processor, i));
1186  }
1187 
1188  for (auto& child : worker_threads) {
1189  child.wait();
1190  }
1191  for (auto& child : worker_threads) {
1192  child.get();
1193  }
1194  } else {
1195  processor(0);
1196  }
1197 }
static void populate_string_ids(std::vector< int32_t > &dest_ids, StringDictionary *dest_dict, const std::vector< int32_t > &source_ids, const StringDictionary *source_dict, const std::map< int32_t, std::string > transient_mapping={})
Populates provided dest_ids vector with string ids corresponding to given source strings.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::populate_string_ids ( std::vector< int32_t > &  dest_ids,
StringDictionary dest_dict,
const std::vector< int32_t > &  source_ids,
const StringDictionary source_dict,
const std::map< int32_t, std::string >  transient_mapping = {} 
)
static

Populates provided dest_ids vector with string ids corresponding to given source strings.

Given a vector of source string ids and corresponding source dictionary, this method populates a vector of destination string ids by either returning the string id of matching strings in the destination dictionary or creating new entries in the dictionary. Source string ids can also be transient if they were created by a function (e.g LOWER/UPPER functions). A map of transient string ids to string values is provided in order to handle this use case.

Parameters
dest_ids- vector of destination string ids to be populated
dest_dict- destination dictionary
source_ids- vector of source string ids for which destination ids are needed
source_dict- source dictionary
transient_mapping- map of transient source string ids to string values

Definition at line 1131 of file StringDictionary.cpp.

References getOrAddBulk(), and getString().

Referenced by populate_string_array_ids(), and DictionaryValueConverter< TARGET_TYPE >::processBuffer().

1136  {
1137  std::vector<std::string> strings;
1138 
1139  for (const int32_t source_id : source_ids) {
1140  if (source_id == std::numeric_limits<int32_t>::min()) {
1141  strings.emplace_back("");
1142  } else if (source_id < 0) {
1143  if (auto string_itr = transient_mapping.find(source_id);
1144  string_itr != transient_mapping.end()) {
1145  strings.emplace_back(string_itr->second);
1146  } else {
1147  throw std::runtime_error("Unexpected negative source ID");
1148  }
1149  } else {
1150  strings.push_back(source_dict->getString(source_id));
1151  }
1152  }
1153 
1154  dest_ids.resize(strings.size());
1155  dest_dict->getOrAddBulk(strings, &dest_ids[0]);
1156 }
void getOrAddBulk(const std::vector< std::string > &string_vec, T *encoded_vec)
std::string getString(int32_t string_id) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::processDictionaryFutures ( std::vector< std::future< std::vector< std::pair< uint32_t, unsigned int >>>> &  dictionary_futures)
private

Definition at line 203 of file StringDictionary.cpp.

References computeUniqueBucketWithHash(), materialize_hashes_, payload_file_off_, rk_hashes_, str_count_, and str_ids_.

Referenced by StringDictionary().

205  {
206  for (auto& dictionary_future : dictionary_futures) {
207  dictionary_future.wait();
208  auto hashVec = dictionary_future.get();
209  for (auto& hash : hashVec) {
210  uint32_t bucket = computeUniqueBucketWithHash(hash.first, str_ids_);
211  payload_file_off_ += hash.second;
212  str_ids_[bucket] = static_cast<int32_t>(str_count_);
213  if (materialize_hashes_) {
214  rk_hashes_[str_count_] = hash.first;
215  }
216  ++str_count_;
217  }
218  }
219  dictionary_futures.clear();
220 }
uint32_t computeUniqueBucketWithHash(const uint32_t hash, const std::vector< int32_t > &data) const noexcept
std::vector< int32_t > str_ids_
std::vector< uint32_t > rk_hashes_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void StringDictionary::sortCache ( std::vector< int32_t > &  cache)
private

Definition at line 1094 of file StringDictionary.cpp.

References getStringFromStorage(), and string_lt().

Referenced by buildSortedCache().

1094  {
1095  // This method is not thread-safe.
1096 
1097  // this boost sort is creating some problems when we use UTF-8 encoded strings.
1098  // TODO (vraj): investigate What is wrong with boost sort and try to mitigate it.
1099 
1100  std::sort(cache.begin(), cache.end(), [this](int32_t a, int32_t b) {
1101  auto a_str = this->getStringFromStorage(a);
1102  auto b_str = this->getStringFromStorage(b);
1103  return string_lt(a_str.c_str_ptr, a_str.size, b_str.c_str_ptr, b_str.size);
1104  });
1105 }
DEVICE bool string_lt(const char *lhs, const int32_t lhs_len, const char *rhs, const int32_t rhs_len)
Definition: StringLike.cpp:304
PayloadString getStringFromStorage(const int string_id) const noexcept

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t StringDictionary::storageEntryCount ( ) const

Definition at line 410 of file StringDictionary.cpp.

References client_, rw_mutex_, and str_count_.

410  {
411  mapd_shared_lock<mapd_shared_mutex> read_lock(rw_mutex_);
412  if (client_) {
413  return client_->storage_entry_count();
414  }
415  return str_count_;
416 }
std::unique_ptr< StringDictionaryClient > client_
mapd_shared_mutex rw_mutex_

Member Data Documentation

char * StringDictionary::CANARY_BUFFER {nullptr}
staticprivate

Definition at line 197 of file StringDictionary.h.

std::unique_ptr<StringDictionaryClient> StringDictionary::client_
private
std::unique_ptr<StringDictionaryClient> StringDictionary::client_no_timeout_
private

Definition at line 195 of file StringDictionary.h.

Referenced by getOrAddBulk(), and getOrAddBulkRemote().

DictionaryCache<std::string, compare_cache_value_t> StringDictionary::compare_cache_
mutableprivate

Definition at line 192 of file StringDictionary.h.

Referenced by getCompare(), and invalidateInvertedIndex().

std::map<std::string, int32_t> StringDictionary::equal_cache_
mutableprivate

Definition at line 191 of file StringDictionary.h.

Referenced by getEquals(), and invalidateInvertedIndex().

bool StringDictionary::isTemp_
private
std::map<std::tuple<std::string, bool, bool, char>, std::vector<int32_t> > StringDictionary::like_cache_
mutableprivate

Definition at line 189 of file StringDictionary.h.

Referenced by getLike(), and invalidateInvertedIndex().

bool StringDictionary::materialize_hashes_
private
constexpr size_t StringDictionary::MAX_STRCOUNT = (1U << 31) - 1
static

Definition at line 116 of file StringDictionary.h.

Referenced by getOrAddBulk().

int StringDictionary::offset_fd_
private
size_t StringDictionary::offset_file_size_
private
StringIdxEntry* StringDictionary::offset_map_
private
std::string StringDictionary::offsets_path_
private

Definition at line 179 of file StringDictionary.h.

Referenced by getOrAddBulk(), and StringDictionary().

int StringDictionary::payload_fd_
private
size_t StringDictionary::payload_file_off_
private

Definition at line 186 of file StringDictionary.h.

Referenced by processDictionaryFutures().

size_t StringDictionary::payload_file_size_
private
char* StringDictionary::payload_map_
private
std::map<std::pair<std::string, char>, std::vector<int32_t> > StringDictionary::regex_cache_
mutableprivate

Definition at line 190 of file StringDictionary.h.

Referenced by getRegexpLike(), and invalidateInvertedIndex().

std::vector<uint32_t> StringDictionary::rk_hashes_
private
mapd_shared_mutex StringDictionary::rw_mutex_
mutableprivate
std::vector<int32_t> StringDictionary::sorted_cache
private

Definition at line 176 of file StringDictionary.h.

Referenced by buildSortedCache(), getCompare(), and mergeSortedCache().

std::vector<int32_t> StringDictionary::str_ids_
private
std::shared_ptr<std::vector<std::string> > StringDictionary::strings_cache_
mutableprivate

Definition at line 193 of file StringDictionary.h.

Referenced by copyStrings().


The documentation for this class was generated from the following files: