OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DataBufferCache Class Reference

#include <TableFunctionsCommon.h>

Public Member Functions

bool isKeyCached (const std::string &key) const
 
bool isKeyCachedAndSameLength (const std::string &key, const size_t num_bytes) const
 
template<typename T >
void getDataForKey (const std::string &key, T *dest_buffer) const
 
template<typename T >
const T & getDataRefForKey (const std::string &key) const
 
template<typename T >
const T * getDataPtrForKey (const std::string &key) const
 
template<typename T >
void putDataForKey (const std::string &key, T *const data_buffer, const size_t num_elements)
 

Private Member Functions

void copyData (int8_t *dest, const int8_t *source, const size_t num_bytes) const
 

Private Attributes

const size_t parallel_copy_min_bytes {1 << 20}
 
std::unordered_map
< std::string, std::shared_ptr
< CacheDataTf > > 
data_cache_
 
std::shared_mutex cache_mutex_
 

Detailed Description

Definition at line 96 of file TableFunctionsCommon.h.

Member Function Documentation

void DataBufferCache::copyData ( int8_t *  dest,
const int8_t *  source,
const size_t  num_bytes 
) const
private

Definition at line 500 of file TableFunctionsCommon.cpp.

References parallel_copy_min_bytes, and threading_serial::parallel_for().

Referenced by getDataForKey(), and putDataForKey().

502  {
503  if (num_bytes < parallel_copy_min_bytes) {
504  std::memcpy(dest, source, num_bytes);
505  return;
506  }
507  const size_t max_bytes_per_thread = parallel_copy_min_bytes;
508  const size_t num_threads =
509  (num_bytes + max_bytes_per_thread - 1) / max_bytes_per_thread;
511  tbb::blocked_range<size_t>(0, num_threads, 1),
512  [&](const tbb::blocked_range<size_t>& r) {
513  const size_t end_chunk_idx = r.end();
514  for (size_t chunk_idx = r.begin(); chunk_idx != end_chunk_idx; ++chunk_idx) {
515  const size_t start_byte = chunk_idx * max_bytes_per_thread;
516  const size_t length =
517  std::min(start_byte + max_bytes_per_thread, num_bytes) - start_byte;
518  std::memcpy(dest + start_byte, source + start_byte, length);
519  }
520  });
521 }
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
const size_t parallel_copy_min_bytes

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename T >
void DataBufferCache::getDataForKey ( const std::string &  key,
T *  dest_buffer 
) const

Definition at line 445 of file TableFunctionsCommon.cpp.

References cache_mutex_, copyData(), data_cache_, and DEBUG_TIMER.

445  {
446  auto timer = DEBUG_TIMER(__func__);
447  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
448  const auto& cached_data_itr = data_cache_.find(key);
449  if (cached_data_itr == data_cache_.end()) {
450  const std::string error_msg = "Data for key " + key + " not found in cache.";
451  throw std::runtime_error(error_msg);
452  }
453  copyData(reinterpret_cast<int8_t*>(dest_buffer),
454  cached_data_itr->second->data_buffer,
455  cached_data_itr->second->num_bytes);
456 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_

+ Here is the call graph for this function:

template<typename T >
const T * DataBufferCache::getDataPtrForKey ( const std::string &  key) const

Definition at line 470 of file TableFunctionsCommon.cpp.

References cache_mutex_, data_cache_, and heavydb.dtypes::T.

470  {
471  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
472  const auto& cached_data_itr = data_cache_.find(key);
473  if (cached_data_itr == data_cache_.end()) {
474  return nullptr;
475  }
476  return reinterpret_cast<const T* const>(cached_data_itr->second->data_buffer);
477 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
template<typename T >
const T & DataBufferCache::getDataRefForKey ( const std::string &  key) const

Definition at line 459 of file TableFunctionsCommon.cpp.

References cache_mutex_, data_cache_, and heavydb.dtypes::T.

459  {
460  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
461  const auto& cached_data_itr = data_cache_.find(key);
462  if (cached_data_itr == data_cache_.end()) {
463  const std::string error_msg{"Data for key " + key + " not found in cache."};
464  throw std::runtime_error(error_msg);
465  }
466  return *reinterpret_cast<const T*>(cached_data_itr->second->data_buffer);
467 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
bool DataBufferCache::isKeyCached ( const std::string &  key) const

Definition at line 429 of file TableFunctionsCommon.cpp.

References cache_mutex_, and data_cache_.

429  {
430  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
431  return data_cache_.count(key) > 0;
432 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
bool DataBufferCache::isKeyCachedAndSameLength ( const std::string &  key,
const size_t  num_bytes 
) const

Definition at line 434 of file TableFunctionsCommon.cpp.

References cache_mutex_, and data_cache_.

435  {
436  std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
437  const auto& cached_data_itr = data_cache_.find(key);
438  if (cached_data_itr == data_cache_.end()) {
439  return false;
440  }
441  return num_bytes == cached_data_itr->second->num_bytes;
442 }
heavyai::shared_lock< heavyai::shared_mutex > read_lock
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_
template<typename T >
void DataBufferCache::putDataForKey ( const std::string &  key,
T *const  data_buffer,
const size_t  num_elements 
)

Definition at line 480 of file TableFunctionsCommon.cpp.

References cache_mutex_, copyData(), data_cache_, DEBUG_TIMER, and heavydb.dtypes::T.

482  {
483  auto timer = DEBUG_TIMER(__func__);
484  const size_t num_bytes(num_elements * sizeof(T));
485  auto cache_data = std::make_shared<CacheDataTf>(num_bytes);
486  copyData(cache_data->data_buffer, reinterpret_cast<int8_t*>(data_buffer), num_bytes);
487  std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
488  const auto& cached_data_itr = data_cache_.find(key);
489  if (data_cache_.find(key) != data_cache_.end()) {
490  const std::string warning_msg =
491  "Data for key " + key + " already exists in cache. Replacing.";
492  std::cout << warning_msg << std::endl;
493  cached_data_itr->second.reset();
494  cached_data_itr->second = cache_data;
495  return;
496  }
497  data_cache_.insert(std::make_pair(key, cache_data));
498 }
void copyData(int8_t *dest, const int8_t *source, const size_t num_bytes) const
heavyai::unique_lock< heavyai::shared_mutex > write_lock
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::shared_mutex cache_mutex_
std::unordered_map< std::string, std::shared_ptr< CacheDataTf > > data_cache_

+ Here is the call graph for this function:

Member Data Documentation

std::shared_mutex DataBufferCache::cache_mutex_
mutableprivate
std::unordered_map<std::string, std::shared_ptr<CacheDataTf> > DataBufferCache::data_cache_
private
const size_t DataBufferCache::parallel_copy_min_bytes {1 << 20}
private

Definition at line 118 of file TableFunctionsCommon.h.

Referenced by copyData().


The documentation for this class was generated from the following files: