OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowCsvForeignStorage Class Reference
+ Inheritance diagram for ArrowCsvForeignStorage:
+ Collaboration diagram for ArrowCsvForeignStorage:

Classes

struct  ArrowFragment
 

Public Member Functions

 ArrowCsvForeignStorage ()
 
void append (const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
 
void read (const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
 
void prepareTable (const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
 
void registerTable (Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
 
std::string getType () const override
 
void createDictionaryEncodedColumn (StringDictionary *dict, const ColumnDescriptor &c, std::vector< ArrowFragment > &col, arrow::ChunkedArray *arr_col_chunked_array, tbb::task_group &tg, const std::vector< Frag > &fragments, ChunkKey key, Data_Namespace::AbstractBufferMgr *mgr)
 
template<typename T , typename ChunkType >
void createDecimalColumn (const ColumnDescriptor &c, std::vector< ArrowFragment > &col, arrow::ChunkedArray *arr_col_chunked_array, tbb::task_group &tg, const std::vector< Frag > &fragments, ChunkKey key, Data_Namespace::AbstractBufferMgr *mgr)
 
- Public Member Functions inherited from PersistentForeignStorageInterface
virtual ~PersistentForeignStorageInterface ()
 

Public Attributes

std::map< std::array< int, 3 >
, std::vector< ArrowFragment > > 
m_columns
 

Detailed Description

Definition at line 44 of file ArrowCsvForeignStorage.cpp.

Constructor & Destructor Documentation

ArrowCsvForeignStorage::ArrowCsvForeignStorage ( )
inline

Definition at line 46 of file ArrowCsvForeignStorage.cpp.

46 {}

Member Function Documentation

void ArrowCsvForeignStorage::append ( const std::vector< ForeignStorageColumnBuffer > &  column_buffers)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 100 of file ArrowCsvForeignStorage.cpp.

References CHECK.

101  {
102  CHECK(false);
103 }
#define CHECK(condition)
Definition: Logger.h:197
template<typename T , typename ChunkType >
void ArrowCsvForeignStorage::createDecimalColumn ( const ColumnDescriptor c,
std::vector< ArrowFragment > &  col,
arrow::ChunkedArray *  arr_col_chunked_array,
tbb::task_group &  tg,
const std::vector< Frag > &  fragments,
ChunkKey  key,
Data_Namespace::AbstractBufferMgr *  mgr 
)

Definition at line 425 of file ArrowCsvForeignStorage.cpp.

References ARROW_ASSIGN_OR_THROW, ColumnDescriptor::columnType, SQLTypeInfo::get_size(), getSizeAndOffset(), and omnisci.dtypes::T.

432  {
433  auto empty = arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
434  size_t column_size = 0;
435  std::vector<int> offsets(fragments.size());
436  for (size_t f = 0; f < fragments.size(); f++) {
437  offsets[f] = column_size;
438  auto& frag = col[f];
439  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
440  int size, offset;
441  getSizeAndOffset(fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
442  // as we create new buffer, offsets are handled with arrow::ArrayData::offset
443  frag.offset = 0;
444  frag.sz += size;
445  }
446  column_size += frag.sz;
447  }
448 
449  std::shared_ptr<arrow::Buffer> result_buffer;
450  ARROW_ASSIGN_OR_THROW(result_buffer,
451  arrow::AllocateBuffer(column_size * c.columnType.get_size()));
452 
453  T* buffer_data = reinterpret_cast<T*>(result_buffer->mutable_data());
454  tbb::parallel_for(
455  tbb::blocked_range(0UL, fragments.size()),
456  [k,
457  buffer_data,
458  &offsets,
459  &fragments,
460  &col,
461  arr_col_chunked_array,
462  &result_buffer,
463  mgr,
464  &c,
465  empty,
466  &tg](auto& range) {
467  auto key = k;
468  for (size_t f = range.begin(); f < range.end(); f++) {
469  T* fragment_data = buffer_data + offsets[f];
470  size_t chunk_offset = 0;
471  key[3] = f;
472  auto& frag = col[f];
473  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
474  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
475  i++) {
476  T* chunk_data = fragment_data + chunk_offset;
477  int size, offset;
479  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
480 
481  auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
482  arr_col_chunked_array->chunk(i));
483 
484  for (int j = 0; j < size; ++j) {
485  if (empty || decimalArray->null_count() == decimalArray->length() ||
486  decimalArray->IsNull(j + offset)) {
487  chunk_data[j] = inline_int_null_value<T>();
488  } else {
489  arrow::Decimal128 val(decimalArray->GetValue(j + offset));
490  chunk_data[j] =
491  static_cast<int64_t>(val); // arrow can cast only to int64_t
492  }
493  }
494 
495  auto converted_chunk = std::make_shared<ChunkType>(size,
496  result_buffer,
497  nullptr,
498  arrow::kUnknownNullCount,
499  offsets[f] + chunk_offset);
500  frag.chunks[i - fragments[f].first_chunk] = converted_chunk->data();
501 
502  chunk_offset += size;
503  }
504 
505  auto b = mgr->createBuffer(key);
506  b->setSize(frag.sz * b->getSqlType().get_size());
507  b->initEncoder(c.columnType);
508  if (!empty) {
509  tg.run([&frag, b]() {
510  for (size_t i = 0; i < frag.chunks.size(); i++) {
511  auto& chunk = frag.chunks[i];
512  int offset = chunk->offset;
513  size_t size = chunk->length;
514  auto data = chunk->buffers[1]->data();
515  b->getEncoder()->updateStats(
516  (const int8_t*)data + offset * b->getSqlType().get_size(), size);
517  }
518  });
519  }
520  b->getEncoder()->setNumElems(frag.sz);
521  }
522  });
523 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, int i, int &size, int &offset)
SQLTypeInfo columnType

+ Here is the call graph for this function:

void ArrowCsvForeignStorage::createDictionaryEncodedColumn ( StringDictionary dict,
const ColumnDescriptor c,
std::vector< ArrowFragment > &  col,
arrow::ChunkedArray *  arr_col_chunked_array,
tbb::task_group &  tg,
const std::vector< Frag > &  fragments,
ChunkKey  key,
Data_Namespace::AbstractBufferMgr *  mgr 
)

Definition at line 317 of file ArrowCsvForeignStorage.cpp.

References ARROW_ASSIGN_OR_THROW, ColumnDescriptor::columnType, measure< TimeT >::execution(), StringDictionary::getOrAddBulk(), getSizeAndOffset(), StringDictionary::storageEntryCount(), and VLOG.

Referenced by registerTable().

325  {
326  tg.run([dict, &c, &col, arr_col_chunked_array, &tg, &fragments, k = key, mgr]() {
327  auto key = k;
328  auto full_time = measure<>::execution([&]() {
329  // calculate offsets for every fragment in bulk
330  size_t bulk_size = 0;
331  std::vector<int> offsets(fragments.size() + 1);
332  for (size_t f = 0; f < fragments.size(); f++) {
333  offsets[f] = bulk_size;
334  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
335  int size, offset;
337  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
338  bulk_size += size;
339  }
340  }
341  offsets[fragments.size()] = bulk_size;
342  std::vector<std::string_view> bulk(bulk_size);
343 
344  tbb::parallel_for(
345  tbb::blocked_range<size_t>(0, fragments.size()),
346  [&bulk, &fragments, arr_col_chunked_array, &offsets](
347  const tbb::blocked_range<size_t>& r) {
348  for (auto f = r.begin(); f != r.end(); ++f) {
349  auto bulk_offset = offsets[f];
350 
351  size_t current_ind = 0;
352  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
353  i++) {
354  int size, offset;
356  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
357 
358  auto stringArray = std::static_pointer_cast<arrow::StringArray>(
359  arr_col_chunked_array->chunk(i));
360  for (int i = offset; i < offset + size; i++) {
361  auto view = stringArray->GetView(i);
362  bulk[bulk_offset + current_ind] =
363  std::string_view(view.data(), view.length());
364  current_ind++;
365  }
366  }
367  }
368  });
369 
370  std::shared_ptr<arrow::Buffer> indices_buf;
371  ARROW_ASSIGN_OR_THROW(indices_buf,
372  arrow::AllocateBuffer(bulk_size * sizeof(int32_t)));
373  auto raw_data = reinterpret_cast<int*>(indices_buf->mutable_data());
374  auto time = measure<>::execution([&]() { dict->getOrAddBulk(bulk, raw_data); });
375 
376  VLOG(1) << "FSI dictionary for column created in: " << time
377  << "ms, strings count: " << bulk_size
378  << ", unique_count: " << dict->storageEntryCount();
379 
380  for (size_t f = 0; f < fragments.size(); f++) {
381  auto bulk_offset = offsets[f];
382  tg.run([k = key,
383  f,
384  &col,
385  mgr,
386  &c,
387  arr_col_chunked_array,
388  bulk_offset,
389  indices_buf,
390  &fragments]() {
391  auto key = k;
392  key[3] = f;
393  auto& frag = col[f];
394  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
395  auto b = mgr->createBuffer(key);
396  b->initEncoder(c.columnType);
397  size_t current_ind = 0;
398  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
399  i++) {
400  int size, offset;
402  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
403  auto indexArray = std::make_shared<arrow::Int32Array>(
404  size, indices_buf, nullptr, -1, bulk_offset + current_ind);
405  frag.chunks[i - fragments[f].first_chunk] = indexArray->data();
406  frag.sz += size;
407  current_ind += size;
408  frag.offset = 0;
409  auto len = frag.chunks[i - fragments[f].first_chunk]->length;
410  auto data = frag.chunks[i - fragments[f].first_chunk]->GetValues<int32_t>(1);
411  b->getEncoder()->updateStats((const int8_t*)data, len);
412  }
413 
414  b->setSize(frag.sz * b->getSqlType().get_size());
415  b->getEncoder()->setNumElems(frag.sz);
416  });
417  }
418  });
419  VLOG(1) << "FSI: createDictionaryEncodedColumn time: " << full_time << "ms"
420  << std::endl;
421  });
422 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t storageEntryCount() const
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, int i, int &size, int &offset)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
SQLTypeInfo columnType
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string ArrowCsvForeignStorage::getType ( ) const
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 831 of file ArrowCsvForeignStorage.cpp.

References logger::INFO, and LOG.

831  {
832  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
833  "(storage_type='CSV:path/to/file.csv');`\n";
834  return "CSV";
835 }
#define LOG(tag)
Definition: Logger.h:188
void ArrowCsvForeignStorage::prepareTable ( const int  db_id,
const std::string &  type,
TableDescriptor td,
std::list< ColumnDescriptor > &  cols 
)
overridevirtual

Reimplemented from PersistentForeignStorageInterface.

Definition at line 297 of file ArrowCsvForeignStorage.cpp.

References TableDescriptor::hasDeletedCol.

300  {
301  td.hasDeletedCol = false;
302 }
void ArrowCsvForeignStorage::read ( const ChunkKey chunk_key,
const SQLTypeInfo sql_type,
int8_t *  dest,
const size_t  numBytes 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 142 of file ArrowCsvForeignStorage.cpp.

References CHECK, CHECK_EQ, CHECK_GE, run_benchmark_import::dest, generateSentinelValues(), SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), kTEXT, and m_columns.

145  {
146  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
147  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
148 
149  CHECK(!frag.chunks.empty() || !chunk_key[3]);
150  int64_t sz = 0, copied = 0;
151  int varlen_offset = 0;
152  size_t read_size = 0;
153  for (size_t i = 0; i < frag.chunks.size(); i++) {
154  auto& array_data = frag.chunks[i];
155  int offset = (i == 0) ? frag.offset : 0;
156  size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
157  : (array_data->length - offset);
158  read_size += size;
159  arrow::Buffer* bp = nullptr;
160  if (sql_type.is_dict_encoded_string()) {
161  // array_data->buffers[1] stores dictionary indexes
162  bp = array_data->buffers[1].get();
163  } else if (sql_type.get_type() == kTEXT) {
164  CHECK_GE(array_data->buffers.size(), 3UL);
165  // array_data->buffers[2] stores string array
166  bp = array_data->buffers[2].get();
167  } else if (array_data->null_count != array_data->length) {
168  // any type except strings (none encoded strings offsets go here as well)
169  CHECK_GE(array_data->buffers.size(), 2UL);
170  bp = array_data->buffers[1].get();
171  }
172  if (bp) {
173  // offset buffer for none encoded strings need to be merged
174  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
175  auto data = reinterpret_cast<const uint32_t*>(bp->data()) + offset;
176  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
177  // as size contains count of string in chunk slice it would always be one less
178  // then offsets array size
179  sz = (size + 1) * sizeof(uint32_t);
180  if (sz > 0) {
181  if (i != 0) {
182  // We merge arrow chunks with string offsets into a single contigous fragment.
183  // Each string is represented by a pair of offsets, thus size of offset table
184  // is num strings + 1. When merging two chunks, the last number in the first
185  // chunk duplicates the first number in the second chunk, so we skip it.
186  data++;
187  sz -= sizeof(uint32_t);
188  } else {
189  // As we support cases when fragment starts with offset of arrow chunk we need
190  // to substract the first element of the first chunk from all elements in that
191  // fragment
192  varlen_offset -= data[0];
193  }
194  // We also re-calculate offsets in the second chunk as it is a continuation of
195  // the first one.
196  std::transform(data,
197  data + (sz / sizeof(uint32_t)),
198  dest_ui32,
199  [varlen_offset](uint32_t val) { return val + varlen_offset; });
200  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
201  }
202  } else {
203  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
204  if (fixed_type) {
205  std::memcpy(
206  dest,
207  bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
208  sz = size * (fixed_type->bit_width() / 8));
209  } else {
210  auto offsets_buffer =
211  reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
212  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
213  auto string_buffer_size =
214  offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
215  std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
216  }
217  }
218  } else {
219  // TODO: nullify?
220  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
221  if (fixed_type) {
222  sz = size * (fixed_type->bit_width() / 8);
223  generateSentinelValues(dest, sql_type, size);
224  } else {
225  CHECK(false); // TODO: what's else???
226  }
227  }
228  dest += sz;
229  copied += sz;
230  }
231  CHECK_EQ(numBytes, size_t(copied));
232 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GE(x, y)
Definition: Logger.h:210
void generateSentinelValues(int8_t *data, const SQLTypeInfo &columnType, size_t count)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
Definition: sqltypes.h:54
#define CHECK(condition)
Definition: Logger.h:197
bool is_dict_encoded_string() const
Definition: sqltypes.h:444
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns

+ Here is the call graph for this function:

void ArrowCsvForeignStorage::registerTable ( Catalog_Namespace::Catalog catalog,
std::pair< int, int >  table_key,
const std::string &  type,
const TableDescriptor td,
const std::list< ColumnDescriptor > &  cols,
Data_Namespace::AbstractBufferMgr *  mgr 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 525 of file ArrowCsvForeignStorage.cpp.

References ARROW_THROW_NOT_OK, CHECK, createDictionaryEncodedColumn(), DataframeTableDescriptor::delimiter, logger::ERROR, measure< TimeT >::execution(), logger::FATAL, generateNullValues(), getArrowImportType(), Catalog_Namespace::Catalog::getMetadataForDict(), getSizeAndOffset(), DataframeTableDescriptor::hasHeader, kDECIMAL, kINT, kNUMERIC, kTEXT, LOG, m_columns, TableDescriptor::maxFragRows, DataframeTableDescriptor::skipRows, and VLOG.

530  {
531  const DataframeTableDescriptor* df_td =
532  dynamic_cast<const DataframeTableDescriptor*>(&td);
533  auto memory_pool = arrow::default_memory_pool();
534  auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
535  arrow_parse_options.quoting = false;
536  arrow_parse_options.escaping = false;
537  arrow_parse_options.newlines_in_values = false;
538  arrow_parse_options.delimiter = df_td ? *df_td->delimiter.c_str() : ',';
539  auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
540  arrow_read_options.use_threads = true;
541 
542  arrow_read_options.block_size = 20 * 1024 * 1024;
543  arrow_read_options.autogenerate_column_names = false;
544  arrow_read_options.skip_rows =
545  df_td ? (df_td->hasHeader ? (df_td->skipRows + 1) : df_td->skipRows) : 1;
546 
547  auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
548  arrow_convert_options.check_utf8 = false;
549  arrow_convert_options.include_columns = arrow_read_options.column_names;
550  arrow_convert_options.strings_can_be_null = true;
551 
552  for (auto& c : cols) {
553  if (c.isSystemCol) {
554  continue; // must be processed by base interface implementation
555  }
556  arrow_convert_options.column_types.emplace(c.columnName,
557  getArrowImportType(c.columnType));
558  arrow_read_options.column_names.push_back(c.columnName);
559  }
560 
561  std::shared_ptr<arrow::io::ReadableFile> inp;
562  auto file_result = arrow::io::ReadableFile::Open(info.c_str());
563  ARROW_THROW_NOT_OK(file_result.status());
564  inp = file_result.ValueOrDie();
565 
566  auto table_reader_result = arrow::csv::TableReader::Make(
567  memory_pool, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
568  ARROW_THROW_NOT_OK(table_reader_result.status());
569  auto table_reader = table_reader_result.ValueOrDie();
570 
571  std::shared_ptr<arrow::Table> arrowTable;
572  auto time = measure<>::execution([&]() {
573  auto arrow_table_result = table_reader->Read();
574  ARROW_THROW_NOT_OK(arrow_table_result.status());
575  arrowTable = arrow_table_result.ValueOrDie();
576  });
577 
578  VLOG(1) << "Read Arrow CSV file " << info << " in " << time << "ms";
579 
580  arrow::Table& table = *arrowTable.get();
581  int cln = 0, num_cols = table.num_columns();
582  int arr_frags = table.column(0)->num_chunks();
583  arrow::ChunkedArray* c0p = table.column(0).get();
584 
585  // here we split arrow chunks between omnisci fragments
586 
587  std::vector<Frag> fragments;
588  int64_t sz = 0;
589  int64_t offset = 0;
590  fragments.push_back({0, 0, 0, 0});
591 
592  for (int i = 0; i < arr_frags;) {
593  auto& chunk = *c0p->chunk(i);
594  auto& frag = *fragments.rbegin();
595  if (td.maxFragRows - sz > chunk.length() - offset) {
596  sz += chunk.length() - offset;
597  if (i == arr_frags - 1) {
598  fragments.rbegin()->last_chunk = arr_frags - 1;
599  fragments.rbegin()->last_chunk_size =
600  c0p->chunk(arr_frags - 1)->length() - offset;
601  }
602  offset = 0;
603  i++;
604  } else {
605  frag.last_chunk = i;
606  frag.last_chunk_size = td.maxFragRows - sz;
607  offset += td.maxFragRows - sz;
608  sz = 0;
609  fragments.push_back({i, static_cast<int>(offset), 0, 0});
610  }
611  }
612  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
613  fragments.rbegin()->last_chunk_size == 0) {
614  // remove empty fragment at the end if any
615  fragments.pop_back();
616  }
617  // data comes like this - database_id, table_id, column_id, fragment_id
618  ChunkKey key{table_key.first, table_key.second, 0, 0};
619  std::array<int, 3> col_key{table_key.first, table_key.second, 0};
620 
621  tbb::task_group tg;
622 
623  for (auto& c : cols) {
624  if (c.isSystemCol) {
625  continue; // must be processed by base interface implementation
626  }
627 
628  if (cln >= num_cols) {
629  LOG(ERROR) << "Number of columns read from Arrow (" << num_cols
630  << ") mismatch CREATE TABLE request: " << cols.size();
631  break;
632  }
633 
634  auto ctype = c.columnType.get_type();
635  col_key[2] = key[2] = c.columnId;
636  auto& col = m_columns[col_key];
637  col.resize(fragments.size());
638  auto arr_col_chunked_array = table.column(cln++).get();
639 
640  if (c.columnType.is_dict_encoded_string()) {
641  auto dictDesc = const_cast<DictDescriptor*>(
642  catalog->getMetadataForDict(c.columnType.get_comp_param()));
643  StringDictionary* dict = dictDesc->stringDict.get();
645  dict, c, col, arr_col_chunked_array, tg, fragments, key, mgr);
646  } else if (ctype == kDECIMAL || ctype == kNUMERIC) {
647  tg.run([this, &c, &col, arr_col_chunked_array, &tg, &fragments, key, mgr]() {
648  switch (c.columnType.get_size()) {
649  case 2:
650  createDecimalColumn<int16_t, arrow::Int16Array>(
651  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
652  break;
653  case 4:
654  createDecimalColumn<int32_t, arrow::Int32Array>(
655  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
656  break;
657  case 8:
658  createDecimalColumn<int64_t, arrow::Int64Array>(
659  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
660  break;
661  default:
662  // TODO: throw unsupported decimal type exception
663  CHECK(false);
664  break;
665  }
666  });
667  } else {
668  auto empty = arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
669  for (size_t f = 0; f < fragments.size(); f++) {
670  key[3] = f;
671  auto& frag = col[f];
672  int64_t varlen = 0;
673  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
674  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
675  int size, offset;
677  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
678  frag.offset += offset;
679  frag.sz += size;
680  frag.chunks[i - fragments[f].first_chunk] =
681  arr_col_chunked_array->chunk(i)->data();
682  auto& buffers = arr_col_chunked_array->chunk(i)->data()->buffers;
683  if (!empty) {
684  if (ctype == kTEXT) {
685  if (buffers.size() <= 2) {
686  LOG(FATAL) << "Type of column #" << cln
687  << " does not match between Arrow and description of "
688  << c.columnName;
689  }
690  auto offsets_buffer = reinterpret_cast<const uint32_t*>(buffers[1]->data());
691  varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
692  } else if (buffers.size() != 2) {
693  LOG(FATAL) << "Type of column #" << cln
694  << " does not match between Arrow and description of "
695  << c.columnName;
696  }
697  }
698  }
699 
700  // create buffer descriptors
701  if (ctype == kTEXT) {
702  auto k = key;
703  k.push_back(1);
704  {
705  auto b = mgr->createBuffer(k);
706  b->setSize(varlen);
707  b->initEncoder(c.columnType);
708  }
709  k[4] = 2;
710  {
711  auto b = mgr->createBuffer(k);
712  b->setSqlType(SQLTypeInfo(kINT, false));
713  b->setSize(frag.sz * b->getSqlType().get_size());
714  }
715  } else {
716  auto b = mgr->createBuffer(key);
717  b->initEncoder(c.columnType);
718  b->setSize(frag.sz * b->getSqlType().get_size());
719  if (!empty) {
720  size_t type_size = c.columnType.get_size();
721  tg.run([b, fr = &frag, type_size]() {
722  size_t sz = 0;
723  for (size_t i = 0; i < fr->chunks.size(); i++) {
724  auto& chunk = fr->chunks[i];
725  int offset = (i == 0) ? fr->offset : 0;
726  size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
727  : (chunk->length - offset);
728  sz += size;
729  auto data = chunk->buffers[1]->data();
730  b->getEncoder()->updateStats((const int8_t*)data + offset * type_size,
731  size);
732  }
733  });
734  }
735  b->getEncoder()->setNumElems(frag.sz);
736  }
737  }
738  if (ctype != kDECIMAL && ctype != kNUMERIC && !c.columnType.is_string()) {
739  generateNullValues(fragments, arr_col_chunked_array, c.columnType);
740  }
741  }
742  } // each col and fragment
743 
744  // wait untill all stats have been updated
745  tg.wait();
746 
747  VLOG(1) << "Created CSV backed temporary table with " << num_cols << " columns, "
748  << arr_frags << " chunks, and " << fragments.size() << " fragments.";
749 }
std::vector< int > ChunkKey
Definition: types.h:37
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:188
void createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::vector< ArrowFragment > &col, arrow::ChunkedArray *arr_col_chunked_array, tbb::task_group &tg, const std::vector< Frag > &fragments, ChunkKey key, Data_Namespace::AbstractBufferMgr *mgr)
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, int i, int &size, int &offset)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
Definition: sqltypes.h:54
void generateNullValues(const std::vector< Frag > &fragments, arrow::ChunkedArray *arr_col_chunked_array, const SQLTypeInfo &columnType)
#define CHECK(condition)
Definition: Logger.h:197
Descriptor for a dictionary for a string columne.
Definition: sqltypes.h:47
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
#define VLOG(n)
Definition: Logger.h:291
specifies the content in-memory of a row in the table metadata table

+ Here is the call graph for this function:

Member Data Documentation

std::map<std::array<int, 3>, std::vector<ArrowFragment> > ArrowCsvForeignStorage::m_columns

Definition at line 92 of file ArrowCsvForeignStorage.cpp.

Referenced by read(), and registerTable().


The documentation for this class was generated from the following file: