OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowCsvForeignStorage Class Reference
+ Inheritance diagram for ArrowCsvForeignStorage:
+ Collaboration diagram for ArrowCsvForeignStorage:

Classes

struct  ArrowFragment
 

Public Member Functions

 ArrowCsvForeignStorage ()
 
void append (const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
 
void read (const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
 
void prepareTable (const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
 
void registerTable (Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
 
std::string getType () const override
 
- Public Member Functions inherited from PersistentForeignStorageInterface
virtual ~PersistentForeignStorageInterface ()
 

Public Attributes

std::map< std::array< int, 3 >
, std::vector< ArrowFragment > > 
m_columns
 

Detailed Description

Definition at line 34 of file ArrowCsvForeignStorage.cpp.

Constructor & Destructor Documentation

ArrowCsvForeignStorage::ArrowCsvForeignStorage ( )
inline

Definition at line 36 of file ArrowCsvForeignStorage.cpp.

36 {}

Member Function Documentation

void ArrowCsvForeignStorage::append ( const std::vector< ForeignStorageColumnBuffer > &  column_buffers)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 71 of file ArrowCsvForeignStorage.cpp.

References CHECK().

72  {
73  CHECK(false);
74 }
CHECK(cgen_state)

+ Here is the call graph for this function:

std::string ArrowCsvForeignStorage::getType ( ) const
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 410 of file ArrowCsvForeignStorage.cpp.

References logger::INFO, and LOG.

410  {
411  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
412  "(storage_type='CSV:path/to/file.csv');`\n";
413  return "CSV";
414 }
#define LOG(tag)
Definition: Logger.h:188
void ArrowCsvForeignStorage::prepareTable ( const int  db_id,
const std::string &  type,
TableDescriptor td,
std::list< ColumnDescriptor > &  cols 
)
overridevirtual

Reimplemented from PersistentForeignStorageInterface.

Definition at line 207 of file ArrowCsvForeignStorage.cpp.

References TableDescriptor::hasDeletedCol.

210  {
211  td.hasDeletedCol = false;
212 }
void ArrowCsvForeignStorage::read ( const ChunkKey chunk_key,
const SQLTypeInfo sql_type,
int8_t *  dest,
const size_t  numBytes 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 76 of file ArrowCsvForeignStorage.cpp.

References CHECK(), CHECK_EQ, CHECK_GE, run_benchmark_import::dest, SQLTypeInfo::get_type(), SQLTypeInfo::is_dict_encoded_string(), kTEXT, and m_columns.

79  {
80  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
81  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
82 
83  CHECK(!frag.chunks.empty() || !chunk_key[3]);
84  int64_t sz = 0, copied = 0;
85  arrow::ArrayData* prev_data = nullptr;
86  int varlen_offset = 0;
87 
88  for (auto array_data : frag.chunks) {
89  arrow::Buffer* bp = nullptr;
90  if (sql_type.is_dict_encoded_string()) {
91  // array_data->buffers[1] stores dictionary indexes
92  bp = array_data->buffers[1].get();
93  } else if (sql_type.get_type() == kTEXT) {
94  CHECK_GE(array_data->buffers.size(), 3UL);
95  // array_data->buffers[2] stores string array
96  bp = array_data->buffers[2].get();
97  } else if (array_data->null_count != array_data->length) {
98  // any type except strings (none encoded strings offsets go here as well)
99  CHECK_GE(array_data->buffers.size(), 2UL);
100  bp = array_data->buffers[1].get();
101  }
102  if (bp) {
103  // offset buffer for none encoded strings need to be merged as arrow chunkes sizes
104  // are less then omnisci fragments sizes
105  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
106  auto data = reinterpret_cast<const uint32_t*>(bp->data());
107  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
108  sz = bp->size();
109  // We merge arrow chunks with string offsets into a single contigous fragment.
110  // Each string is represented by a pair of offsets, thus size of offset table is
111  // num strings + 1. When merging two chunks, the last number in the first chunk
112  // duplicates the first number in the second chunk, so we skip it.
113  if (prev_data && sz > 0) {
114  data++;
115  sz -= sizeof(uint32_t);
116  }
117  if (sz > 0) {
118  // We also re-calculate offsets in the second chunk as it is a continuation of
119  // the first one.
120  std::transform(data,
121  data + (sz / sizeof(uint32_t)),
122  dest_ui32,
123  [varlen_offset](uint32_t val) { return val + varlen_offset; });
124  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
125  }
126  } else {
127  std::memcpy(dest, bp->data(), sz = bp->size());
128  }
129  } else {
130  // TODO: nullify?
131  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
132  if (fixed_type) {
133  sz = array_data->length * fixed_type->bit_width() / 8;
134  } else
135  CHECK(false); // TODO: what's else???
136  }
137  dest += sz;
138  copied += sz;
139  prev_data = array_data.get();
140  }
141  CHECK_EQ(numBytes, size_t(copied));
142 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CHECK_GE(x, y)
Definition: Logger.h:210
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:248
CHECK(cgen_state)
Definition: sqltypes.h:53
bool is_dict_encoded_string() const
Definition: sqltypes.h:425
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns

+ Here is the call graph for this function:

void ArrowCsvForeignStorage::registerTable ( Catalog_Namespace::Catalog catalog,
std::pair< int, int >  table_key,
const std::string &  type,
const TableDescriptor td,
const std::list< ColumnDescriptor > &  cols,
Data_Namespace::AbstractBufferMgr *  mgr 
)
overridevirtual

Implements PersistentForeignStorageInterface.

Definition at line 214 of file ArrowCsvForeignStorage.cpp.

References ARROW_THROW_NOT_OK, CHECK(), CHECK_LT, Encoder::Create(), logger::ERROR, measure< TimeT >::execution(), logger::FATAL, TableDescriptor::fragPageSize, getArrowImportType(), Catalog_Namespace::Catalog::getMetadataForDict(), kINT, kTEXT, LOG, m_columns, Asio::start(), DictDescriptor::stringDict, and VLOG.

219  {
220  auto memory_pool = arrow::default_memory_pool();
221  auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
222  arrow_parse_options.quoting = false;
223  arrow_parse_options.escaping = false;
224  arrow_parse_options.newlines_in_values = false;
225 
226  auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
227  arrow_read_options.use_threads = true;
228  arrow_read_options.block_size = 2 * 1024 * 1024;
229 
230  auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
231  arrow_convert_options.check_utf8 = false;
232 
233  arrow_read_options.skip_rows = 0; // TODO: add a way to switch csv header on
234  arrow_read_options.autogenerate_column_names =
235  false; // read column names from first row after skip_rows
236  arrow_convert_options.include_columns = arrow_read_options.column_names;
237 
238  for (auto c : cols) {
239  if (c.isSystemCol) {
240  continue; // must be processed by base interface implementation
241  }
242  arrow_convert_options.column_types.emplace(c.columnName,
243  getArrowImportType(c.columnType));
244  }
245 
246  std::shared_ptr<arrow::io::ReadableFile> inp;
247  auto file_result = arrow::io::ReadableFile::Open(info.c_str());
248  ARROW_THROW_NOT_OK(file_result.status());
249  inp = file_result.ValueOrDie();
250 
251  auto table_reader_result = arrow::csv::TableReader::Make(
252  memory_pool, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
253  ARROW_THROW_NOT_OK(table_reader_result.status());
254  auto table_reader = table_reader_result.ValueOrDie();
255 
256  std::shared_ptr<arrow::Table> arrowTable;
257  auto time = measure<>::execution([&]() {
258  auto arrow_table_result = table_reader->Read();
259  ARROW_THROW_NOT_OK(arrow_table_result.status());
260  arrowTable = arrow_table_result.ValueOrDie();
261  });
262 
263  VLOG(1) << "Read Arrow CSV file " << info << " in " << time << "ms";
264 
265  arrow::Table& table = *arrowTable.get();
266  int cln = 0, num_cols = table.num_columns();
267  int arr_frags = table.column(0)->num_chunks();
268  arrow::ChunkedArray* c0p = table.column(0).get();
269 
270  std::vector<std::pair<int, int>> fragments;
271  int start = 0;
272  int64_t sz = c0p->chunk(0)->length();
273  // calculate size and boundaries of fragments
274  for (int i = 1; i < arr_frags; i++) {
275  if (sz > td.fragPageSize) {
276  fragments.emplace_back(start, i);
277  start = i;
278  sz = 0;
279  }
280  sz += c0p->chunk(i)->length();
281  }
282  fragments.emplace_back(start, arr_frags);
283 
284  // data comes like this - database_id, table_id, column_id, fragment_id
285  ChunkKey key{table_key.first, table_key.second, 0, 0};
286  std::array<int, 3> col_key{table_key.first, table_key.second, 0};
287 
288  auto tp = arrow::internal::GetCpuThreadPool();
289  auto tg = arrow::internal::TaskGroup::MakeThreaded(tp);
290 
291  for (auto& c : cols) {
292  if (c.isSystemCol) {
293  continue; // must be processed by base interface implementation
294  }
295 
296  if (cln >= num_cols) {
297  LOG(ERROR) << "Number of columns read from Arrow (" << num_cols
298  << ") mismatch CREATE TABLE request: " << cols.size();
299  break;
300  }
301 
302  auto ctype = c.columnType.get_type();
303  col_key[2] = key[2] = c.columnId;
304  auto& col = m_columns[col_key];
305  col.resize(fragments.size());
306  auto arr_col_chunked_array = table.column(cln++).get();
307 
308  StringDictionary* dict{nullptr};
309  if (c.columnType.is_dict_encoded_string()) {
310  auto dictDesc = const_cast<DictDescriptor*>(
311  catalog->getMetadataForDict(c.columnType.get_comp_param()));
312  dict = dictDesc->stringDict.get();
313  }
314  auto empty = arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
315 
316  // fill each fragment
317  for (size_t f = 0; f < fragments.size(); f++) {
318  key[3] = f;
319  auto& frag = col[f];
320  int64_t varlen = 0;
321  // for each arrow chunk
322  for (int i = fragments[f].first, e = fragments[f].second; i < e; i++) {
323  CHECK_LT(i, arr_col_chunked_array->length());
324  if (c.columnType.is_dict_encoded_string()) {
325  arrow::Int32Builder indexBuilder;
326  auto stringArray = std::static_pointer_cast<arrow::StringArray>(
327  arr_col_chunked_array->chunk(i));
328  indexBuilder.Reserve(stringArray->length());
329  for (int i = 0; i < stringArray->length(); i++) {
330  // TODO: use arrow dictionary encoding
331  if (stringArray->IsNull(i) || empty ||
332  stringArray->null_count() == stringArray->length()) {
333  indexBuilder.Append(inline_int_null_value<int32_t>());
334  } else {
335  CHECK(dict);
336  auto curStr = stringArray->GetString(i);
337  indexBuilder.Append(dict->getOrAdd(curStr));
338  }
339  }
340  std::shared_ptr<arrow::Array> indexArray;
341  ARROW_THROW_NOT_OK(indexBuilder.Finish(&indexArray));
342  frag.chunks.emplace_back(indexArray->data());
343  frag.sz += stringArray->length();
344  } else {
345  frag.chunks.emplace_back(arr_col_chunked_array->chunk(i)->data());
346  frag.sz += arr_col_chunked_array->chunk(i)->length();
347  auto& buffers = arr_col_chunked_array->chunk(i)->data()->buffers;
348  if (!empty) {
349  if (ctype == kTEXT) {
350  if (buffers.size() <= 2) {
351  LOG(FATAL) << "Type of column #" << cln
352  << " does not match between Arrow and description of "
353  << c.columnName;
354  }
355  varlen += buffers[2]->size();
356  } else if (buffers.size() != 2) {
357  LOG(FATAL) << "Type of column #" << cln
358  << " does not match between Arrow and description of "
359  << c.columnName;
360  }
361  }
362  }
363  }
364 
365  // create buffer descriptotrs
366  if (ctype == kTEXT && !c.columnType.is_dict_encoded_string()) {
367  auto k = key;
368  k.push_back(1);
369  {
370  auto b = mgr->createBuffer(k);
371  b->setSize(varlen);
372  b->encoder.reset(Encoder::Create(b, c.columnType));
373  b->has_encoder = true;
374  b->sql_type = c.columnType;
375  }
376  k[4] = 2;
377  {
378  auto b = mgr->createBuffer(k);
379  b->sql_type = SQLTypeInfo(kINT, false);
380  b->setSize(frag.sz * b->sql_type.get_size());
381  }
382  } else {
383  auto b = mgr->createBuffer(key);
384  b->sql_type = c.columnType;
385  b->setSize(frag.sz * b->sql_type.get_size());
386  b->encoder.reset(Encoder::Create(b, c.columnType));
387  b->has_encoder = true;
388  if (!empty) {
389  // asynchronously update stats for incoming data
390  tg->Append([b, fr = &frag]() {
391  for (auto chunk : fr->chunks) {
392  auto len = chunk->length;
393  auto data = chunk->buffers[1]->data();
394  b->encoder->updateStats((const int8_t*)data, len);
395  }
396  return arrow::Status::OK();
397  });
398  }
399  b->encoder->setNumElems(frag.sz);
400  }
401  }
402  } // each col and fragment
403 
404  // wait untill all stats have been updated
405  ARROW_THROW_NOT_OK(tg->Finish());
406  VLOG(1) << "Created CSV backed temporary table with " << num_cols << " columns, "
407  << arr_frags << " chunks, and " << fragments.size() << " fragments.";
408 }
std::vector< int > ChunkKey
Definition: types.h:35
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:37
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:188
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
Definition: Encoder.cpp:26
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
void start()
Definition: Asio.cpp:33
std::shared_ptr< StringDictionary > stringDict
CHECK(cgen_state)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:53
Descriptor for a dictionary for a string columne.
Definition: sqltypes.h:46
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
#define VLOG(n)
Definition: Logger.h:291

+ Here is the call graph for this function:

Member Data Documentation

std::map<std::array<int, 3>, std::vector<ArrowFragment> > ArrowCsvForeignStorage::m_columns

Definition at line 63 of file ArrowCsvForeignStorage.cpp.

Referenced by read(), and registerTable().


The documentation for this class was generated from the following file: