OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowForeignStorageBase Class Reference
+ Inheritance diagram for ArrowForeignStorageBase:
+ Collaboration diagram for ArrowForeignStorageBase:

Public Member Functions

void append (const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
 
void read (const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
 
int8_t * tryZeroCopy (const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t numBytes) override
 
void dropTable (const int db_id, const int table_id) override
 
void parseArrowTable (Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr, const arrow::Table &table)
 
std::shared_ptr
< arrow::ChunkedArray > 
createDictionaryEncodedColumn (StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
 
std::shared_ptr
< arrow::ChunkedArray > 
convertArrowDictionary (StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
 
template<typename T , typename ChunkType >
std::shared_ptr
< arrow::ChunkedArray > 
createDecimalColumn (const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
 
std::shared_ptr
< arrow::ChunkedArray > 
replaceNullValues (const SQLTypeInfo &columnType, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
 
template<typename T >
std::shared_ptr
< arrow::ChunkedArray > 
replaceNullValuesImpl (std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
 
void getSizeAndOffset (const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
 
int64_t makeFragment (const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen)
 
- Public Member Functions inherited from PersistentForeignStorageInterface
virtual ~PersistentForeignStorageInterface ()
 
virtual void prepareTable (const int, const std::string &type, TableDescriptor &, std::list< ColumnDescriptor > &)
 
virtual void registerTable (Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr)=0
 
virtual std::string getType () const =0
 

Public Attributes

std::map< std::array< int, 3 >
, std::vector< ArrowFragment > > 
m_columns
 

Detailed Description

Definition at line 50 of file ArrowForeignStorage.cpp.

Member Function Documentation

void ArrowForeignStorageBase::append ( const std::vector< ForeignStorageColumnBuffer > &  column_buffers)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 498 of file ArrowForeignStorage.cpp.

References CHECK.

499  {
500  CHECK(false);
501 }
#define CHECK(condition)
Definition: Logger.h:222
std::shared_ptr< arrow::ChunkedArray > ArrowForeignStorageBase::convertArrowDictionary ( StringDictionary dict,
const ColumnDescriptor c,
std::shared_ptr< arrow::ChunkedArray >  arr_col_chunked_array 
)

Definition at line 687 of file ArrowForeignStorage.cpp.

References CHECK, StringDictionary::getOrAddBulk(), and run_benchmark_import::res.

Referenced by parseArrowTable().

690  {
691  // TODO: allocate one big array and split it by fragments as it is done in
692  // createDictionaryEncodedColumn
693  std::vector<std::shared_ptr<arrow::Array>> converted_chunks;
694  for (auto& chunk : arr_col_chunked_array->chunks()) {
695  auto dict_array = std::static_pointer_cast<arrow::DictionaryArray>(chunk);
696  auto values = std::static_pointer_cast<arrow::StringArray>(dict_array->dictionary());
697  std::vector<std::string_view> strings(values->length());
698  for (int i = 0; i < values->length(); i++) {
699  auto view = values->GetView(i);
700  strings[i] = std::string_view(view.data(), view.length());
701  }
702  auto arrow_indices =
703  std::static_pointer_cast<arrow::Int32Array>(dict_array->indices());
704  std::vector<int> indices_mapping(values->length());
705  dict->getOrAddBulk(strings, indices_mapping.data());
706 
707  // create new arrow chunk with remapped indices
708  std::shared_ptr<arrow::Buffer> dict_indices_buf;
709  auto res = arrow::AllocateBuffer(arrow_indices->length() * sizeof(int32_t));
710  CHECK(res.ok());
711  dict_indices_buf = std::move(res).ValueOrDie();
712  auto raw_data = reinterpret_cast<int32_t*>(dict_indices_buf->mutable_data());
713 
714  for (int i = 0; i < arrow_indices->length(); i++) {
715  raw_data[i] = indices_mapping[arrow_indices->Value(i)];
716  }
717 
718  converted_chunks.push_back(
719  std::make_shared<arrow::Int32Array>(arrow_indices->length(), dict_indices_buf));
720  }
721  return std::make_shared<arrow::ChunkedArray>(converted_chunks);
722 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename T , typename ChunkType >
std::shared_ptr< arrow::ChunkedArray > ArrowForeignStorageBase::createDecimalColumn ( const ColumnDescriptor c,
std::shared_ptr< arrow::ChunkedArray >  arr_col_chunked_array 
)

Definition at line 725 of file ArrowForeignStorage.cpp.

References CHECK, ColumnDescriptor::columnType, SQLTypeInfo::get_size(), threading_serial::parallel_for(), run_benchmark_import::res, and heavydb.dtypes::T.

727  {
728  size_t column_size = 0;
729  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
730  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
731  offsets[i] = column_size;
732  column_size += arr_col_chunked_array->chunk(i)->length();
733  }
734 
735  std::shared_ptr<arrow::Buffer> result_buffer;
736  auto res = arrow::AllocateBuffer(column_size * c.columnType.get_size());
737  CHECK(res.ok());
738  result_buffer = std::move(res).ValueOrDie();
739 
740  T* buffer_data = reinterpret_cast<T*>(result_buffer->mutable_data());
742  tbb::blocked_range(0, arr_col_chunked_array->num_chunks()),
743  [buffer_data, &offsets, arr_col_chunked_array](auto& range) {
744  for (int chunk_idx = range.begin(); chunk_idx < range.end(); chunk_idx++) {
745  auto offset = offsets[chunk_idx];
746  T* chunk_buffer = buffer_data + offset;
747 
748  auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
749  arr_col_chunked_array->chunk(chunk_idx));
750  auto empty =
751  arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
752  for (int i = 0; i < decimalArray->length(); i++) {
753  if (empty || decimalArray->null_count() == decimalArray->length() ||
754  decimalArray->IsNull(i)) {
755  chunk_buffer[i] = inline_int_null_value<T>();
756  } else {
757  arrow::Decimal128 val(decimalArray->GetValue(i));
758  chunk_buffer[i] =
759  static_cast<int64_t>(val); // arrow can cast only to int64_t
760  }
761  }
762  }
763  });
764  auto array = std::make_shared<ChunkType>(column_size, result_buffer);
765  return std::make_shared<arrow::ChunkedArray>(array);
766 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:389
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define CHECK(condition)
Definition: Logger.h:222
SQLTypeInfo columnType

+ Here is the call graph for this function:

std::shared_ptr< arrow::ChunkedArray > ArrowForeignStorageBase::createDictionaryEncodedColumn ( StringDictionary dict,
const ColumnDescriptor c,
std::shared_ptr< arrow::ChunkedArray >  arr_col_chunked_array 
)

Definition at line 649 of file ArrowForeignStorage.cpp.

References CHECK, StringDictionary::getOrAddBulk(), threading_serial::parallel_for(), and run_benchmark_import::res.

Referenced by parseArrowTable().

652  {
653  // calculate offsets for every fragment in bulk
654  size_t bulk_size = 0;
655  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
656  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
657  offsets[i] = bulk_size;
658  bulk_size += arr_col_chunked_array->chunk(i)->length();
659  }
660 
661  std::vector<std::string_view> bulk(bulk_size);
662 
664  tbb::blocked_range<int>(0, arr_col_chunked_array->num_chunks()),
665  [&bulk, &arr_col_chunked_array, &offsets](const tbb::blocked_range<int>& r) {
666  for (int i = r.begin(); i < r.end(); i++) {
667  auto chunk = std::static_pointer_cast<arrow::StringArray>(
668  arr_col_chunked_array->chunk(i));
669  auto offset = offsets[i];
670  for (int j = 0; j < chunk->length(); j++) {
671  auto view = chunk->GetView(j);
672  bulk[offset + j] = std::string_view(view.data(), view.length());
673  }
674  }
675  });
676 
677  std::shared_ptr<arrow::Buffer> indices_buf;
678  auto res = arrow::AllocateBuffer(bulk_size * sizeof(int32_t));
679  CHECK(res.ok());
680  indices_buf = std::move(res).ValueOrDie();
681  auto raw_data = reinterpret_cast<int*>(indices_buf->mutable_data());
682  dict->getOrAddBulk(bulk, raw_data);
683  auto array = std::make_shared<arrow::Int32Array>(bulk_size, indices_buf);
684  return std::make_shared<arrow::ChunkedArray>(array);
685 }
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
#define CHECK(condition)
Definition: Logger.h:222

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ArrowForeignStorageBase::dropTable ( const int  db_id,
const int  table_id 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 641 of file ArrowForeignStorage.cpp.

References m_columns.

641  {
642  auto it = m_columns.lower_bound({db_id, table_id, 0});
643  while (it->first[0] == db_id && it->first[1] == table_id) {
644  it = m_columns.erase(it);
645  }
646 }
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
void ArrowForeignStorageBase::getSizeAndOffset ( const Frag frag,
const std::shared_ptr< arrow::Array > &  chunk,
size_t  i,
int &  size,
int &  offset 
)

Definition at line 281 of file ArrowForeignStorage.cpp.

References Frag::first_chunk, Frag::first_chunk_offset, Frag::last_chunk, and Frag::last_chunk_size.

Referenced by makeFragment().

285  {
286  offset = (i == frag.first_chunk) ? frag.first_chunk_offset : 0;
287  size = (i == frag.last_chunk) ? frag.last_chunk_size : (chunk->length() - offset);
288 }
size_t first_chunk_offset
size_t last_chunk
size_t last_chunk_size
size_t first_chunk

+ Here is the caller graph for this function:

int64_t ArrowForeignStorageBase::makeFragment ( const Frag frag,
ArrowFragment arrowFrag,
const std::vector< std::shared_ptr< arrow::Array >> &  chunks,
bool  is_varlen 
)

Definition at line 290 of file ArrowForeignStorage.cpp.

References ArrowFragment::chunks, Frag::first_chunk, getSizeAndOffset(), Frag::last_chunk, ArrowFragment::offset, and ArrowFragment::sz.

Referenced by parseArrowTable().

294  {
295  int64_t varlen = 0;
296  arrowFrag.chunks.resize(frag.last_chunk - frag.first_chunk + 1);
297  for (int i = frag.first_chunk, e = frag.last_chunk; i <= e; i++) {
298  int size, offset;
299  getSizeAndOffset(frag, chunks[i], i, size, offset);
300  arrowFrag.offset += offset;
301  arrowFrag.sz += size;
302  arrowFrag.chunks[i - frag.first_chunk] = chunks[i]->data();
303  auto& buffers = chunks[i]->data()->buffers;
304  if (is_varlen) {
305  if (buffers.size() <= 2) {
306  throw std::runtime_error(
307  "Importing fixed length arrow array as variable length column");
308  }
309  auto offsets_buffer = reinterpret_cast<const uint32_t*>(buffers[1]->data());
310  varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
311  } else if (buffers.size() != 2) {
312  throw std::runtime_error(
313  "Importing varialbe length arrow array as fixed length column");
314  }
315  }
316  // return length of string buffer if array is none encoded string
317  return varlen;
318 }
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
size_t last_chunk
size_t first_chunk

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ArrowForeignStorageBase::parseArrowTable ( Catalog_Namespace::Catalog catalog,
std::pair< int, int >  table_key,
const std::string &  type,
const TableDescriptor td,
const std::list< ColumnDescriptor > &  cols,
Data_Namespace::AbstractBufferMgr *  mgr,
const arrow::Table &  table 
)

Definition at line 355 of file ArrowForeignStorage.cpp.

References calculateFragmentsOffsets(), CHECK, convertArrowDictionary(), createDictionaryEncodedColumn(), logger::ERROR, anonymous_namespace{Utm.h}::f, Catalog_Namespace::Catalog::getMetadataForDict(), kDECIMAL, kINT, kNUMERIC, kTEXT, LOG, m_columns, makeFragment(), TableDescriptor::maxFragRows, threading_serial::parallel_for(), replaceNullValues(), heavydb.dtypes::STRING, and DictDescriptor::stringDict.

Referenced by ArrowForeignStorage::registerTable(), and ArrowCsvForeignStorage::registerTable().

361  {
362  std::map<std::array<int, 3>, StringDictionary*> dictionaries;
363  for (auto& c : cols) {
364  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
365  m_columns[col_key] = {};
366  // fsi registerTable runs under SqliteLock which does not allow invoking
367  // getMetadataForDict in other threads
368  if (c.columnType.is_dict_encoded_string()) {
369  auto dictDesc = catalog->getMetadataForDict(c.columnType.get_comp_param());
370  dictionaries[col_key] = dictDesc->stringDict.get();
371  }
372  }
373 
374  tbb::task_group tg;
375 
377  tbb::blocked_range(0, (int)cols.size()),
378  [this, &tg, &table_key, &td, mgr, &table, &cols, &dictionaries](auto range) {
379  auto columnIter = std::next(cols.begin(), range.begin());
380  for (auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
381  auto& c = *(columnIter++);
382 
383  if (c.isSystemCol) {
384  continue; // must be processed by base interface implementation
385  }
386 
387  // data comes like this - database_id, table_id, column_id, fragment_id
388  ChunkKey key{table_key.first, table_key.second, c.columnId, 0};
389  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
390 
391  if (col_idx >= table.num_columns()) {
392  LOG(ERROR) << "Number of columns read from Arrow (" << table.num_columns()
393  << ") mismatch CREATE TABLE request: " << cols.size();
394  break;
395  }
396 
397  auto arr_col_chunked_array = table.column(col_idx);
398  auto column_type = c.columnType.get_type();
399 
400  if (column_type != kDECIMAL && column_type != kNUMERIC &&
401  !c.columnType.is_string()) {
402  arr_col_chunked_array = replaceNullValues(column_type, arr_col_chunked_array);
403  }
404 
405  if (c.columnType.is_dict_encoded_string()) {
406  StringDictionary* dict = dictionaries[col_key];
407 
408  switch (arr_col_chunked_array->type()->id()) {
409  case arrow::Type::STRING:
410  arr_col_chunked_array =
411  createDictionaryEncodedColumn(dict, c, arr_col_chunked_array);
412  break;
413  case arrow::Type::DICTIONARY:
414  arr_col_chunked_array =
415  convertArrowDictionary(dict, c, arr_col_chunked_array);
416  break;
417  default:
418  CHECK(false);
419  }
420  } else if (column_type == kDECIMAL || column_type == kNUMERIC) {
421  switch (c.columnType.get_size()) {
422  case 2:
423  arr_col_chunked_array = createDecimalColumn<int16_t, arrow::Int16Array>(
424  c, arr_col_chunked_array);
425  break;
426  case 4:
427  arr_col_chunked_array = createDecimalColumn<int32_t, arrow::Int32Array>(
428  c, arr_col_chunked_array);
429  break;
430  case 8:
431  arr_col_chunked_array = createDecimalColumn<int64_t, arrow::Int64Array>(
432  c, arr_col_chunked_array);
433  break;
434  default:
435  // TODO: throw unsupported decimal type exception
436  CHECK(false);
437  break;
438  }
439  }
440 
441  auto fragments =
442  calculateFragmentsOffsets(*arr_col_chunked_array, td.maxFragRows);
443 
444  auto ctype = c.columnType.get_type();
445  auto& col = m_columns[col_key];
446  col.resize(fragments.size());
447 
448  for (size_t f = 0; f < fragments.size(); f++) {
449  key[3] = f;
450  auto& frag = col[f];
451  bool is_varlen = ctype == kTEXT && !c.columnType.is_dict_encoded_string();
452  size_t varlen = makeFragment(
453  fragments[f], frag, arr_col_chunked_array->chunks(), is_varlen);
454 
455  // create buffer descriptors
456  if (ctype == kTEXT && !c.columnType.is_dict_encoded_string()) {
457  auto k = key;
458  k.push_back(1);
459  {
460  auto b = mgr->createBuffer(k);
461  b->setSize(varlen);
462  b->initEncoder(c.columnType);
463  }
464  k[4] = 2;
465  {
466  auto b = mgr->createBuffer(k);
467  b->setSqlType(SQLTypeInfo(kINT, false));
468  b->setSize(frag.sz * b->getSqlType().get_size());
469  }
470  } else {
471  auto b = mgr->createBuffer(key);
472  b->setSize(frag.sz * c.columnType.get_size());
473  b->initEncoder(c.columnType);
474  size_t type_size = c.columnType.get_size();
475  tg.run([b, fr = &frag, type_size]() {
476  size_t sz = 0;
477  for (size_t i = 0; i < fr->chunks.size(); i++) {
478  auto& chunk = fr->chunks[i];
479  int offset = (i == 0) ? fr->offset : 0;
480  size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
481  : (chunk->length - offset);
482  sz += size;
483  auto data = chunk->buffers[1]->data();
484  b->getEncoder()->updateStatsEncoded(
485  (const int8_t*)data + offset * type_size, size);
486  }
487  });
488  b->getEncoder()->setNumElems(frag.sz);
489  }
490  }
491  }
492  }); // each col and fragment
493 
494  // wait untill all stats have been updated
495  tg.wait();
496 }
std::shared_ptr< arrow::ChunkedArray > replaceNullValues(const SQLTypeInfo &columnType, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
std::vector< int > ChunkKey
Definition: types.h:36
#define LOG(tag)
Definition: Logger.h:216
std::shared_ptr< arrow::ChunkedArray > convertArrowDictionary(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
tuple STRING
Definition: dtypes.py:31
constexpr double f
Definition: Utm.h:31
std::shared_ptr< StringDictionary > stringDict
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1960
Definition: sqltypes.h:67
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
std::shared_ptr< arrow::ChunkedArray > createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
std::vector< Frag > calculateFragmentsOffsets(const arrow::ChunkedArray &array, size_t maxFragRows)
#define CHECK(condition)
Definition: Logger.h:222
Definition: sqltypes.h:60
int64_t makeFragment(const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void ArrowForeignStorageBase::read ( const ChunkKey chunk_key,
const SQLTypeInfo sql_type,
int8_t *  dest,
const size_t  numBytes 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 503 of file ArrowForeignStorage.cpp.

References CHECK, CHECK_EQ, CHECK_GE, run_benchmark_import::dest, SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), kTEXT, m_columns, and shared::transform().

506  {
507  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
508  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
509 
510  CHECK(!frag.chunks.empty() || !chunk_key[3]);
511  int64_t sz = 0, copied = 0;
512  int varlen_offset = 0;
513  size_t read_size = 0;
514  for (size_t i = 0; i < frag.chunks.size(); i++) {
515  auto& array_data = frag.chunks[i];
516  int offset = (i == 0) ? frag.offset : 0;
517  size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
518  : (array_data->length - offset);
519  read_size += size;
520  arrow::Buffer* bp = nullptr;
521  if (sql_type.is_dict_encoded_string()) {
522  // array_data->buffers[1] stores dictionary indexes
523  bp = array_data->buffers[1].get();
524  } else if (sql_type.get_type() == kTEXT) {
525  CHECK_GE(array_data->buffers.size(), 3UL);
526  // array_data->buffers[2] stores string array
527  bp = array_data->buffers[2].get();
528  } else if (array_data->null_count != array_data->length) {
529  // any type except strings (none encoded strings offsets go here as well)
530  CHECK_GE(array_data->buffers.size(), 2UL);
531  bp = array_data->buffers[1].get();
532  }
533  CHECK(bp);
534  // offset buffer for none encoded strings need to be merged
535  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
536  auto data = reinterpret_cast<const uint32_t*>(bp->data()) + offset;
537  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
538  // as size contains count of string in chunk slice it would always be one less
539  // then offsets array size
540  sz = (size + 1) * sizeof(uint32_t);
541  if (sz > 0) {
542  if (i != 0) {
543  // We merge arrow chunks with string offsets into a single contigous fragment.
544  // Each string is represented by a pair of offsets, thus size of offset table
545  // is num strings + 1. When merging two chunks, the last number in the first
546  // chunk duplicates the first number in the second chunk, so we skip it.
547  data++;
548  sz -= sizeof(uint32_t);
549  } else {
550  // As we support cases when fragment starts with offset of arrow chunk we need
551  // to substract the first element of the first chunk from all elements in that
552  // fragment
553  varlen_offset -= data[0];
554  }
555  // We also re-calculate offsets in the second chunk as it is a continuation of
556  // the first one.
557  std::transform(data,
558  data + (sz / sizeof(uint32_t)),
559  dest_ui32,
560  [varlen_offset](uint32_t val) { return val + varlen_offset; });
561  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
562  }
563  } else {
564  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
565  if (fixed_type) {
566  std::memcpy(
567  dest,
568  bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
569  sz = size * (fixed_type->bit_width() / 8));
570  } else {
571  auto offsets_buffer =
572  reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
573  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
574  auto string_buffer_size =
575  offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
576  std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
577  }
578  }
579  dest += sz;
580  copied += sz;
581  }
582  CHECK_EQ(numBytes, size_t(copied));
583 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
#define CHECK_GE(x, y)
Definition: Logger.h:235
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:379
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:296
Definition: sqltypes.h:67
#define CHECK(condition)
Definition: Logger.h:222
bool is_dict_encoded_string() const
Definition: sqltypes.h:627

+ Here is the call graph for this function:

std::shared_ptr< arrow::ChunkedArray > ArrowForeignStorageBase::replaceNullValues ( const SQLTypeInfo columnType,
std::shared_ptr< arrow::ChunkedArray >  arr_col_chunked_array 
)

Definition at line 110 of file ArrowForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::is_boolean(), is_datetime(), SQLTypeInfo::is_fp(), and SQLTypeInfo::is_integer().

Referenced by parseArrowTable().

112  {
113  const size_t typeSize = columnType.get_size();
114  if (columnType.is_integer() || is_datetime(columnType.get_type())) {
115  switch (typeSize) {
116  case 1:
117  return replaceNullValuesImpl<int8_t>(arr_col_chunked_array);
118  case 2:
119  return replaceNullValuesImpl<int16_t>(arr_col_chunked_array);
120  case 4:
121  return replaceNullValuesImpl<int32_t>(arr_col_chunked_array);
122  case 8:
123  return replaceNullValuesImpl<int64_t>(arr_col_chunked_array);
124  default:
125  // TODO: throw unsupported integer type exception
126  CHECK(false);
127  }
128  } else if (columnType.is_fp()) {
129  switch (typeSize) {
130  case 4:
131  return replaceNullValuesImpl<float>(arr_col_chunked_array);
132  case 8:
133  return replaceNullValuesImpl<double>(arr_col_chunked_array);
134  }
135  } else if (columnType.is_boolean()) {
136  return replaceNullValuesImpl<bool>(arr_col_chunked_array);
137  }
138  CHECK(false);
139  return nullptr;
140 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:389
bool is_fp() const
Definition: sqltypes.h:579
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:379
bool is_integer() const
Definition: sqltypes.h:577
bool is_boolean() const
Definition: sqltypes.h:582
#define CHECK(condition)
Definition: Logger.h:222
constexpr auto is_datetime(SQLTypes type)
Definition: sqltypes.h:313

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

template<typename T >
std::shared_ptr< arrow::ChunkedArray > ArrowForeignStorageBase::replaceNullValuesImpl ( std::shared_ptr< arrow::ChunkedArray >  arr_col_chunked_array)

Definition at line 199 of file ArrowForeignStorage.cpp.

References convertBoolBitmapBufferWithNulls(), convertBoolBitmapBufferWithoutNulls(), gpu_enabled::copy(), run_benchmark_import::dest, gpu_enabled::fill(), is_null(), threading_serial::parallel_for(), and heavydb.dtypes::T.

200  {
201  if ((!std::is_same_v<T, bool>)&&(arr_col_chunked_array->null_count() == 0)) {
202  // for boolean columns we still need to convert bitmaps to array
203  return arr_col_chunked_array;
204  }
205 
206  auto null_value = get_null_value<T>();
207 
208  auto resultBuf =
209  arrow::AllocateBuffer(sizeof(T) * arr_col_chunked_array->length()).ValueOrDie();
210  auto resultData = reinterpret_cast<T*>(resultBuf->mutable_data());
211 
213  tbb::blocked_range<size_t>(0, arr_col_chunked_array->num_chunks()),
214  [&](const tbb::blocked_range<size_t>& r) {
215  for (size_t c = r.begin(); c != r.end(); ++c) {
216  size_t offset = 0;
217  for (size_t i = 0; i < c; i++) {
218  offset += arr_col_chunked_array->chunk(i)->length();
219  }
220  auto resWithOffset = resultData + offset;
221 
222  auto chunk = arr_col_chunked_array->chunk(c);
223 
224  if (chunk->null_count() == chunk->length()) {
225  std::fill(resWithOffset, resWithOffset + chunk->length(), null_value);
226  continue;
227  }
228 
229  auto chunkData = reinterpret_cast<const T*>(chunk->data()->buffers[1]->data());
230 
231  const uint8_t* bitmap_data = chunk->null_bitmap_data();
232  const int64_t length = chunk->length();
233 
234  if (chunk->null_count() == 0) {
235  if constexpr (std::is_same_v<T, bool>) {
237  reinterpret_cast<int8_t*>(resWithOffset),
238  reinterpret_cast<const uint8_t*>(chunkData),
239  length);
240  } else {
241  std::copy(chunkData, chunkData + chunk->length(), resWithOffset);
242  }
243  continue;
244  }
245 
246  if constexpr (std::is_same_v<T, bool>) {
247  convertBoolBitmapBufferWithNulls(reinterpret_cast<int8_t*>(resWithOffset),
248  reinterpret_cast<const uint8_t*>(chunkData),
249  bitmap_data,
250  length,
251  null_value);
252  } else {
253  for (int64_t bitmap_idx = 0; bitmap_idx < length / 8; ++bitmap_idx) {
254  auto source = chunkData + bitmap_idx * 8;
255  auto dest = resWithOffset + bitmap_idx * 8;
256  auto inversed_bitmap = ~bitmap_data[bitmap_idx];
257  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
258  auto is_null = (inversed_bitmap >> bitmap_offset) & 1;
259  auto val = is_null ? null_value : source[bitmap_offset];
260  dest[bitmap_offset] = val;
261  }
262  }
263 
264  for (int64_t j = length / 8 * 8; j < length; ++j) {
265  auto is_null = (~bitmap_data[length / 8] >> (j % 8)) & 1;
266  auto val = is_null ? null_value : chunkData[j];
267  resWithOffset[j] = val;
268  }
269  }
270  }
271  });
272 
273  using ArrowType = typename arrow::CTypeTraits<T>::ArrowType;
274  using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
275 
276  auto array =
277  std::make_shared<ArrayType>(arr_col_chunked_array->length(), std::move(resultBuf));
278  return std::make_shared<arrow::ChunkedArray>(array);
279 }
void convertBoolBitmapBufferWithoutNulls(int8_t *dst, const uint8_t *src, int64_t length)
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
CONSTEXPR DEVICE bool is_null(const T &value)
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void convertBoolBitmapBufferWithNulls(int8_t *dst, const uint8_t *src, const uint8_t *bitmap, int64_t length, int8_t null_value)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())

+ Here is the call graph for this function:

int8_t * ArrowForeignStorageBase::tryZeroCopy ( const ChunkKey chunk_key,
const SQLTypeInfo sql_type,
const size_t  numBytes 
)
overridevirtual

Reimplemented from PersistentForeignStorageInterface.

Definition at line 585 of file ArrowForeignStorage.cpp.

References CHECK_GE, SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), kTEXT, and m_columns.

587  {
588  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
589  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
590 
591  // fragment should be continious to allow zero copy
592  if (frag.chunks.size() != 1) {
593  return nullptr;
594  }
595 
596  auto& array_data = frag.chunks[0];
597  int offset = frag.offset;
598 
599  arrow::Buffer* bp = nullptr;
600  if (sql_type.is_dict_encoded_string()) {
601  // array_data->buffers[1] stores dictionary indexes
602  bp = array_data->buffers[1].get();
603  } else if (sql_type.get_type() == kTEXT) {
604  CHECK_GE(array_data->buffers.size(), 3UL);
605  // array_data->buffers[2] stores string array
606  bp = array_data->buffers[2].get();
607  } else if (array_data->null_count != array_data->length) {
608  // any type except strings (none encoded strings offsets go here as well)
609  CHECK_GE(array_data->buffers.size(), 2UL);
610  bp = array_data->buffers[1].get();
611  }
612 
613  // arrow buffer is empty, it means we should fill fragment with null's in read function
614  if (!bp) {
615  return nullptr;
616  }
617 
618  auto data = reinterpret_cast<int8_t*>(const_cast<uint8_t*>(bp->data()));
619 
620  // if buffer is null encoded string index buffer
621  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
622  // if offset != 0 we need to recalculate index buffer by adding offset to each index
623  if (offset != 0) {
624  return nullptr;
625  } else {
626  return data;
627  }
628  }
629 
630  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
631  if (fixed_type) {
632  return data + (array_data->offset + offset) * (fixed_type->bit_width() / 8);
633  }
634  // if buffer is none encoded string data buffer
635  // then we should find it's offset in offset buffer
636  auto offsets_buffer = reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
637  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
638  return data + string_buffer_offset;
639 }
#define CHECK_GE(x, y)
Definition: Logger.h:235
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:379
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
Definition: sqltypes.h:67
bool is_dict_encoded_string() const
Definition: sqltypes.h:627

+ Here is the call graph for this function:

Member Data Documentation

std::map<std::array<int, 3>, std::vector<ArrowFragment> > ArrowForeignStorageBase::m_columns

Definition at line 107 of file ArrowForeignStorage.cpp.

Referenced by dropTable(), parseArrowTable(), read(), and tryZeroCopy().


The documentation for this class was generated from the following file: