OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DataBufferCache Class Reference

#include <TableFunctionsCommon.hpp>

Public Member Functions

bool isKeyCached (const std::string &key) const
 
bool isKeyCachedAndSameLength (const std::string &key, const size_t num_bytes) const
 
template<typename T >
void getDataForKey (const std::string &key, T *dest_buffer) const
 
template<typename T >
const T & getDataRefForKey (const std::string &key) const
 
template<typename T >
const T * getDataPtrForKey (const std::string &key) const
 
template<typename T >
void putDataForKey (const std::string &key, T *const data_buffer, const size_t num_elements)
 

Private Member Functions

void copyData (int8_t *dest, const int8_t *source, const size_t num_bytes) const
 

Private Attributes

const size_t parallel_copy_min_bytes {1 << 20}
 
std::unordered_map
< std::string, std::shared_ptr
< CacheDataTf > > 
data_cache_
 
std::shared_mutex cache_mutex_
 

Detailed Description

Definition at line 96 of file TableFunctionsCommon.hpp.

Member Function Documentation

void DataBufferCache::copyData ( int8_t *  dest,
const int8_t *  source,
const size_t  num_bytes 
) const
private

Definition at line 507 of file TableFunctionsCommon.cpp.

References parallel_copy_min_bytes, and threading_serial::parallel_for().

Referenced by getDataForKey(), and putDataForKey().

509  {
510  if (num_bytes < parallel_copy_min_bytes) {
511  std::memcpy(dest, source, num_bytes);
512  return;
513  }
514  const size_t max_bytes_per_thread = parallel_copy_min_bytes;
515  const size_t num_threads =
516  (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
518  tbb::blocked_range<size_t>(0, num_threads, 1),
519  [&](const tbb::blocked_range<size_t>& r) {
520  const size_t end_chunk_idx = r.end();
521  for (size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
522  const size_t start_byte = chunk_idx * max_bytes_per_thread;
523  const size_t length =
524  std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
525  std::memcpy(dest + start_byte, source + start_byte, length);
526  }
527  });
528 }
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
const size_t parallel_copy_min_bytes

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename T >
void DataBufferCache::getDataForKey ( const std::string &  key,
T *  dest_buffer 
) const

Definition at line 452 of file TableFunctionsCommon.cpp.

References cache_mutex_, copyData(), data_cache_, and DEBUG_TIMER.

452  {
453  auto timer = DEBUG_TIMER(__func__);
454  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
455  const auto& cached_data_itr = data_cache_.find(key);
456  if (cached_data_itr == data_cache_.end()) {
457  const std::string error_msg = "Data for key " + key + " not found in cache.";
458  throw std::runtime_error(error_msg);
459  }
460  copyData(reinterpret_cast<int8_t*>(dest_buffer),
461  cached_data_itr->second->data_buffer,
462  cached_data_itr->second->num_bytes);
463 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_

+ Here is the call graph for this function:

template<typename T >
const T * DataBufferCache::getDataPtrForKey ( const std::string &  key) const

Definition at line 477 of file TableFunctionsCommon.cpp.

References cache_mutex_, data_cache_, and heavydb.dtypes::T.

477  {
478  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
479  const auto& cached_data_itr = data_cache_.find(key);
480  if (cached_data_itr == data_cache_.end()) {
481  return nullptr;
482  }
483  return reinterpret_cast<const T* const>(cached_data_itr->second->data_buffer);
484 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
template<typename T >
const T & DataBufferCache::getDataRefForKey ( const std::string &  key) const

Definition at line 466 of file TableFunctionsCommon.cpp.

References cache_mutex_, data_cache_, and heavydb.dtypes::T.

466  {
467  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
468  const auto& cached_data_itr = data_cache_.find(key);
469  if (cached_data_itr == data_cache_.end()) {
470  const std::string error_msg{"Data for key " + key + " not found in cache."};
471  throw std::runtime_error(error_msg);
472  }
473  return *reinterpret_cast<const T*>(cached_data_itr->second->data_buffer);
474 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
bool DataBufferCache::isKeyCached ( const std::string &  key) const

Definition at line 436 of file TableFunctionsCommon.cpp.

References cache_mutex_, and data_cache_.

436  {
437  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
438  return data_cache_.count(key) > 0;
439 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
bool DataBufferCache::isKeyCachedAndSameLength ( const std::string &  key,
const size_t  num_bytes 
) const

Definition at line 441 of file TableFunctionsCommon.cpp.

References cache_mutex_, and data_cache_.

442  {
443  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
444  const auto& cached_data_itr = data_cache_.find(key);
445  if (cached_data_itr == data_cache_.end()) {
446  return false;
447  }
448  return num_bytes == cached_data_itr->second->num_bytes;
449 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
template<typename T >
void DataBufferCache::putDataForKey ( const std::string &  key,
T *const  data_buffer,
const size_t  num_elements 
)

Definition at line 487 of file TableFunctionsCommon.cpp.

References cache_mutex_, copyData(), data_cache_, DEBUG_TIMER, and heavydb.dtypes::T.

489  {
490  auto timer = DEBUG_TIMER(__func__);
491  const size_t num_bytes(num_elements * sizeof(T));
492  auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
493  copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
494  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
495  const auto& cached_data_itr = data_cache_.find(key);
496  if (data_cache_.find(key) != data_cache_.end()) {
497  const std::string warning_msg =
498  "Data for key " + key + " already exists in cache. Replacing.";
499  std::cout << warning_msg << std::endl;
500  cached_data_itr->second.reset();
501  cached_data_itr->second = cache_data;
502  return;
503  }
504  data_cache_.insert(std::make_pair(key, cache_data));
505 }
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
heavyai::unique_lock< heavyai::shared_mutex > write_lock
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_

+ Here is the call graph for this function:

Member Data Documentation

std::shared_mutex DataBufferCache::cache_mutex_
mutableprivate
std::unordered_map<std::string, std::shared_ptr<CacheDataTf> > DataBufferCache::data_cache_
private
const size_t DataBufferCache::parallel_copy_min_bytes {1 << 20}
private

Definition at line 118 of file TableFunctionsCommon.hpp.

Referenced by copyData().


The documentation for this class was generated from the following files: