OmniSciDB  2e3a973ef4
ArrowCsvForeignStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ArrowCsvForeignStorage.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/csv/reader.h>
21 #include <arrow/io/file.h>
22 #include <arrow/util/bit_util.h>
23 #include <arrow/util/decimal.h>
24 #include <tbb/parallel_for.h>
25 #include <tbb/task_group.h>
26 #include <array>
27 #include <future>
28 
32 #include "Logger/Logger.h"
34 #include "Shared/ArrowUtil.h"
35 #include "Shared/measure.h"
36 
37 struct Frag {
38  int first_chunk; // index of the first chunk assigned to the fragment
39  int first_chunk_offset; // offset from the begining of the first chunk
40  int last_chunk; // index of the last chunk
41  int last_chunk_size; // number of elements in the last chunk
42 };
43 
45  public:
47 
48  void append(const std::vector<ForeignStorageColumnBuffer>& column_buffers) override;
49 
50  void read(const ChunkKey& chunk_key,
51  const SQLTypeInfo& sql_type,
52  int8_t* dest,
53  const size_t numBytes) override;
54 
55  void prepareTable(const int db_id,
56  const std::string& type,
57  TableDescriptor& td,
58  std::list<ColumnDescriptor>& cols) override;
59  void registerTable(Catalog_Namespace::Catalog* catalog,
60  std::pair<int, int> table_key,
61  const std::string& type,
62  const TableDescriptor& td,
63  const std::list<ColumnDescriptor>& cols,
64  Data_Namespace::AbstractBufferMgr* mgr) override;
65 
66  std::string getType() const override;
67 
68  struct ArrowFragment {
69  int64_t offset;
70  int64_t sz;
71  std::vector<std::shared_ptr<arrow::ArrayData>> chunks;
72  };
73 
74  void createDictionaryEncodedColumn(StringDictionary* dict,
75  const ColumnDescriptor& c,
76  std::vector<ArrowFragment>& col,
77  arrow::ChunkedArray* arr_col_chunked_array,
78  tbb::task_group& tg,
79  const std::vector<Frag>& fragments,
80  ChunkKey key,
81  Data_Namespace::AbstractBufferMgr* mgr);
82 
83  template <typename T, typename ChunkType>
84  void createDecimalColumn(const ColumnDescriptor& c,
85  std::vector<ArrowFragment>& col,
86  arrow::ChunkedArray* arr_col_chunked_array,
87  tbb::task_group& tg,
88  const std::vector<Frag>& fragments,
89  ChunkKey key,
90  Data_Namespace::AbstractBufferMgr* mgr);
91 
92  std::map<std::array<int, 3>, std::vector<ArrowFragment>> m_columns;
93 };
94 
97  std::make_unique<ArrowCsvForeignStorage>());
98 }
99 
101  const std::vector<ForeignStorageColumnBuffer>& column_buffers) {
102  CHECK(false);
103 }
104 
105 template <typename T>
106 void setNulls(int8_t* data, int count) {
107  T* dataT = reinterpret_cast<T*>(data);
108  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
109  : std::numeric_limits<T>::max();
110  std::fill(dataT, dataT + count, null_value);
111 }
112 
113 void generateSentinelValues(int8_t* data, const SQLTypeInfo& columnType, size_t count) {
114  const size_t type_size = columnType.get_size();
115  if (columnType.is_integer()) {
116  switch (type_size) {
117  case 1:
118  setNulls<int8_t>(data, count);
119  break;
120  case 2:
121  setNulls<int16_t>(data, count);
122  break;
123  case 4:
124  setNulls<int32_t>(data, count);
125  break;
126  case 8:
127  setNulls<int64_t>(data, count);
128  break;
129  default:
130  // TODO: throw unsupported integer type exception
131  CHECK(false);
132  }
133  } else {
134  if (type_size == 4) {
135  setNulls<float>(data, count);
136  } else {
137  setNulls<double>(data, count);
138  }
139  }
140 }
141 
143  const SQLTypeInfo& sql_type,
144  int8_t* dest,
145  const size_t numBytes) {
146  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
147  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
148 
149  CHECK(!frag.chunks.empty() || !chunk_key[3]);
150  int64_t sz = 0, copied = 0;
151  int varlen_offset = 0;
152  size_t read_size = 0;
153  for (size_t i = 0; i < frag.chunks.size(); i++) {
154  auto& array_data = frag.chunks[i];
155  int offset = (i == 0) ? frag.offset : 0;
156  size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
157  : (array_data->length - offset);
158  read_size += size;
159  arrow::Buffer* bp = nullptr;
160  if (sql_type.is_dict_encoded_string()) {
161  // array_data->buffers[1] stores dictionary indexes
162  bp = array_data->buffers[1].get();
163  } else if (sql_type.get_type() == kTEXT) {
164  CHECK_GE(array_data->buffers.size(), 3UL);
165  // array_data->buffers[2] stores string array
166  bp = array_data->buffers[2].get();
167  } else if (array_data->null_count != array_data->length) {
168  // any type except strings (none encoded strings offsets go here as well)
169  CHECK_GE(array_data->buffers.size(), 2UL);
170  bp = array_data->buffers[1].get();
171  }
172  if (bp) {
173  // offset buffer for none encoded strings need to be merged
174  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
175  auto data = reinterpret_cast<const uint32_t*>(bp->data()) + offset;
176  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
177  // as size contains count of string in chunk slice it would always be one less
178  // then offsets array size
179  sz = (size + 1) * sizeof(uint32_t);
180  if (sz > 0) {
181  if (i != 0) {
182  // We merge arrow chunks with string offsets into a single contigous fragment.
183  // Each string is represented by a pair of offsets, thus size of offset table
184  // is num strings + 1. When merging two chunks, the last number in the first
185  // chunk duplicates the first number in the second chunk, so we skip it.
186  data++;
187  sz -= sizeof(uint32_t);
188  } else {
189  // As we support cases when fragment starts with offset of arrow chunk we need
190  // to substract the first element of the first chunk from all elements in that
191  // fragment
192  varlen_offset -= data[0];
193  }
194  // We also re-calculate offsets in the second chunk as it is a continuation of
195  // the first one.
196  std::transform(data,
197  data + (sz / sizeof(uint32_t)),
198  dest_ui32,
199  [varlen_offset](uint32_t val) { return val + varlen_offset; });
200  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
201  }
202  } else {
203  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
204  if (fixed_type) {
205  std::memcpy(
206  dest,
207  bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
208  sz = size * (fixed_type->bit_width() / 8));
209  } else {
210  auto offsets_buffer =
211  reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
212  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
213  auto string_buffer_size =
214  offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
215  std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
216  }
217  }
218  } else {
219  // TODO: nullify?
220  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
221  if (fixed_type) {
222  sz = size * (fixed_type->bit_width() / 8);
223  generateSentinelValues(dest, sql_type, size);
224  } else {
225  CHECK(false); // TODO: what's else???
226  }
227  }
228  dest += sz;
229  copied += sz;
230  }
231  CHECK_EQ(numBytes, size_t(copied));
232 }
233 
234 // TODO: this overlaps with getArrowType() from ArrowResultSetConverter.cpp but with few
235 // differences in kTEXT and kDATE
236 static std::shared_ptr<arrow::DataType> getArrowImportType(const SQLTypeInfo type) {
237  using namespace arrow;
238  auto ktype = type.get_type();
239  if (IS_INTEGER(ktype)) {
240  switch (type.get_size()) {
241  case 1:
242  return int8();
243  case 2:
244  return int16();
245  case 4:
246  return int32();
247  case 8:
248  return int64();
249  default:
250  CHECK(false);
251  }
252  }
253  switch (ktype) {
254  case kBOOLEAN:
255  return boolean();
256  case kFLOAT:
257  return float32();
258  case kDOUBLE:
259  return float64();
260  case kCHAR:
261  case kVARCHAR:
262  case kTEXT:
263  return utf8();
264  case kDECIMAL:
265  case kNUMERIC:
266  return decimal(type.get_precision(), type.get_scale());
267  case kTIME:
268  return time32(TimeUnit::SECOND);
269  // case kDATE:
270  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
271  // Currently support for date32() is missing in cuDF.Hence, if client requests for
272  // date on GPU, return date64() for the time being, till support is added.
273  // return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
274  case kTIMESTAMP:
275  switch (type.get_precision()) {
276  case 0:
277  return timestamp(TimeUnit::SECOND);
278  case 3:
279  return timestamp(TimeUnit::MILLI);
280  case 6:
281  return timestamp(TimeUnit::MICRO);
282  case 9:
283  return timestamp(TimeUnit::NANO);
284  default:
285  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
286  std::to_string(type.get_precision()));
287  }
288  case kARRAY:
289  case kINTERVAL_DAY_TIME:
291  default:
292  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
293  }
294  return nullptr;
295 }
296 
298  const std::string& type,
299  TableDescriptor& td,
300  std::list<ColumnDescriptor>& cols) {
301  td.hasDeletedCol = false;
302 }
303 
304 void getSizeAndOffset(const Frag& frag,
305  const std::shared_ptr<arrow::Array>& chunk,
306  int i,
307  int& size,
308  int& offset) {
309  offset = (i == frag.first_chunk) ? frag.first_chunk_offset : 0;
310  size = (i == frag.last_chunk) ? frag.last_chunk_size : (chunk->length() - offset);
311 }
312 
313 void generateNullValues(const std::vector<Frag>& fragments,
314  arrow::ChunkedArray* arr_col_chunked_array,
315  const SQLTypeInfo& columnType);
316 
318  StringDictionary* dict,
319  const ColumnDescriptor& c,
320  std::vector<ArrowFragment>& col,
321  arrow::ChunkedArray* arr_col_chunked_array,
322  tbb::task_group& tg,
323  const std::vector<Frag>& fragments,
324  ChunkKey key,
325  Data_Namespace::AbstractBufferMgr* mgr) {
326  tg.run([dict, &c, &col, arr_col_chunked_array, &tg, &fragments, k = key, mgr]() {
327  auto key = k;
328  auto full_time = measure<>::execution([&]() {
329  // calculate offsets for every fragment in bulk
330  size_t bulk_size = 0;
331  std::vector<int> offsets(fragments.size() + 1);
332  for (size_t f = 0; f < fragments.size(); f++) {
333  offsets[f] = bulk_size;
334  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
335  int size, offset;
337  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
338  bulk_size += size;
339  }
340  }
341  offsets[fragments.size()] = bulk_size;
342  std::vector<std::string_view> bulk(bulk_size);
343 
344  tbb::parallel_for(
345  tbb::blocked_range<size_t>(0, fragments.size()),
346  [&bulk, &fragments, arr_col_chunked_array, &offsets](
347  const tbb::blocked_range<size_t>& r) {
348  for (auto f = r.begin(); f != r.end(); ++f) {
349  auto bulk_offset = offsets[f];
350 
351  size_t current_ind = 0;
352  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
353  i++) {
354  int size, offset;
356  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
357 
358  auto stringArray = std::static_pointer_cast<arrow::StringArray>(
359  arr_col_chunked_array->chunk(i));
360  for (int i = offset; i < offset + size; i++) {
361  auto view = stringArray->GetView(i);
362  bulk[bulk_offset + current_ind] =
363  std::string_view(view.data(), view.length());
364  current_ind++;
365  }
366  }
367  }
368  });
369 
370  std::shared_ptr<arrow::Buffer> indices_buf;
371  ARROW_ASSIGN_OR_THROW(indices_buf,
372  arrow::AllocateBuffer(bulk_size * sizeof(int32_t)));
373  auto raw_data = reinterpret_cast<int*>(indices_buf->mutable_data());
374  auto time = measure<>::execution([&]() { dict->getOrAddBulk(bulk, raw_data); });
375 
376  VLOG(1) << "FSI dictionary for column created in: " << time
377  << "ms, strings count: " << bulk_size
378  << ", unique_count: " << dict->storageEntryCount();
379 
380  for (size_t f = 0; f < fragments.size(); f++) {
381  auto bulk_offset = offsets[f];
382  tg.run([k = key,
383  f,
384  &col,
385  mgr,
386  &c,
387  arr_col_chunked_array,
388  bulk_offset,
389  indices_buf,
390  &fragments]() {
391  auto key = k;
392  key[3] = f;
393  auto& frag = col[f];
394  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
395  auto b = mgr->createBuffer(key);
396  b->initEncoder(c.columnType);
397  size_t current_ind = 0;
398  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
399  i++) {
400  int size, offset;
402  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
403  auto indexArray = std::make_shared<arrow::Int32Array>(
404  size, indices_buf, nullptr, -1, bulk_offset + current_ind);
405  frag.chunks[i - fragments[f].first_chunk] = indexArray->data();
406  frag.sz += size;
407  current_ind += size;
408  frag.offset = 0;
409  auto len = frag.chunks[i - fragments[f].first_chunk]->length;
410  auto data = frag.chunks[i - fragments[f].first_chunk]->GetValues<int32_t>(1);
411  b->getEncoder()->updateStats((const int8_t*)data, len);
412  }
413 
414  b->setSize(frag.sz * b->getSqlType().get_size());
415  b->getEncoder()->setNumElems(frag.sz);
416  });
417  }
418  });
419  VLOG(1) << "FSI: createDictionaryEncodedColumn time: " << full_time << "ms"
420  << std::endl;
421  });
422 }
423 
424 template <typename T, typename ChunkType>
426  const ColumnDescriptor& c,
427  std::vector<ArrowFragment>& col,
428  arrow::ChunkedArray* arr_col_chunked_array,
429  tbb::task_group& tg,
430  const std::vector<Frag>& fragments,
431  ChunkKey k,
432  Data_Namespace::AbstractBufferMgr* mgr) {
433  auto empty = arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
434  size_t column_size = 0;
435  std::vector<int> offsets(fragments.size());
436  for (size_t f = 0; f < fragments.size(); f++) {
437  offsets[f] = column_size;
438  auto& frag = col[f];
439  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
440  int size, offset;
441  getSizeAndOffset(fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
442  // as we create new buffer, offsets are handled with arrow::ArrayData::offset
443  frag.offset = 0;
444  frag.sz += size;
445  }
446  column_size += frag.sz;
447  }
448 
449  std::shared_ptr<arrow::Buffer> result_buffer;
450  ARROW_ASSIGN_OR_THROW(result_buffer,
451  arrow::AllocateBuffer(column_size * c.columnType.get_size()));
452 
453  T* buffer_data = reinterpret_cast<T*>(result_buffer->mutable_data());
454  tbb::parallel_for(
455  tbb::blocked_range(0UL, fragments.size()),
456  [k,
457  buffer_data,
458  &offsets,
459  &fragments,
460  &col,
461  arr_col_chunked_array,
462  &result_buffer,
463  mgr,
464  &c,
465  empty,
466  &tg](auto& range) {
467  auto key = k;
468  for (size_t f = range.begin(); f < range.end(); f++) {
469  T* fragment_data = buffer_data + offsets[f];
470  size_t chunk_offset = 0;
471  key[3] = f;
472  auto& frag = col[f];
473  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
474  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e;
475  i++) {
476  T* chunk_data = fragment_data + chunk_offset;
477  int size, offset;
479  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
480 
481  auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
482  arr_col_chunked_array->chunk(i));
483 
484  for (int j = 0; j < size; ++j) {
485  if (empty || decimalArray->null_count() == decimalArray->length() ||
486  decimalArray->IsNull(j + offset)) {
487  chunk_data[j] = inline_int_null_value<T>();
488  } else {
489  arrow::Decimal128 val(decimalArray->GetValue(j + offset));
490  chunk_data[j] =
491  static_cast<int64_t>(val); // arrow can cast only to int64_t
492  }
493  }
494 
495  auto converted_chunk = std::make_shared<ChunkType>(size,
496  result_buffer,
497  nullptr,
498  arrow::kUnknownNullCount,
499  offsets[f] + chunk_offset);
500  frag.chunks[i - fragments[f].first_chunk] = converted_chunk->data();
501 
502  chunk_offset += size;
503  }
504 
505  auto b = mgr->createBuffer(key);
506  b->setSize(frag.sz * b->getSqlType().get_size());
507  b->initEncoder(c.columnType);
508  if (!empty) {
509  tg.run([&frag, b]() {
510  for (size_t i = 0; i < frag.chunks.size(); i++) {
511  auto& chunk = frag.chunks[i];
512  int offset = chunk->offset;
513  size_t size = chunk->length;
514  auto data = chunk->buffers[1]->data();
515  b->getEncoder()->updateStats(
516  (const int8_t*)data + offset * b->getSqlType().get_size(), size);
517  }
518  });
519  }
520  b->getEncoder()->setNumElems(frag.sz);
521  }
522  });
523 }
524 
526  std::pair<int, int> table_key,
527  const std::string& info,
528  const TableDescriptor& td,
529  const std::list<ColumnDescriptor>& cols,
530  Data_Namespace::AbstractBufferMgr* mgr) {
531  const DataframeTableDescriptor* df_td =
532  dynamic_cast<const DataframeTableDescriptor*>(&td);
533  auto memory_pool = arrow::default_memory_pool();
534  auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
535  arrow_parse_options.quoting = false;
536  arrow_parse_options.escaping = false;
537  arrow_parse_options.newlines_in_values = false;
538  arrow_parse_options.delimiter = df_td ? *df_td->delimiter.c_str() : ',';
539  auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
540  arrow_read_options.use_threads = true;
541 
542  arrow_read_options.block_size = 20 * 1024 * 1024;
543  arrow_read_options.autogenerate_column_names = false;
544  arrow_read_options.skip_rows =
545  df_td ? (df_td->hasHeader ? (df_td->skipRows + 1) : df_td->skipRows) : 1;
546 
547  auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
548  arrow_convert_options.check_utf8 = false;
549  arrow_convert_options.include_columns = arrow_read_options.column_names;
550  arrow_convert_options.strings_can_be_null = true;
551 
552  for (auto& c : cols) {
553  if (c.isSystemCol) {
554  continue; // must be processed by base interface implementation
555  }
556  arrow_convert_options.column_types.emplace(c.columnName,
557  getArrowImportType(c.columnType));
558  arrow_read_options.column_names.push_back(c.columnName);
559  }
560 
561  std::shared_ptr<arrow::io::ReadableFile> inp;
562  auto file_result = arrow::io::ReadableFile::Open(info.c_str());
563  ARROW_THROW_NOT_OK(file_result.status());
564  inp = file_result.ValueOrDie();
565 
566  auto table_reader_result = arrow::csv::TableReader::Make(
567  memory_pool, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
568  ARROW_THROW_NOT_OK(table_reader_result.status());
569  auto table_reader = table_reader_result.ValueOrDie();
570 
571  std::shared_ptr<arrow::Table> arrowTable;
572  auto time = measure<>::execution([&]() {
573  auto arrow_table_result = table_reader->Read();
574  ARROW_THROW_NOT_OK(arrow_table_result.status());
575  arrowTable = arrow_table_result.ValueOrDie();
576  });
577 
578  VLOG(1) << "Read Arrow CSV file " << info << " in " << time << "ms";
579 
580  arrow::Table& table = *arrowTable.get();
581  int cln = 0, num_cols = table.num_columns();
582  int arr_frags = table.column(0)->num_chunks();
583  arrow::ChunkedArray* c0p = table.column(0).get();
584 
585  // here we split arrow chunks between omnisci fragments
586 
587  std::vector<Frag> fragments;
588  int64_t sz = 0;
589  int64_t offset = 0;
590  fragments.push_back({0, 0, 0, 0});
591 
592  for (int i = 0; i < arr_frags;) {
593  auto& chunk = *c0p->chunk(i);
594  auto& frag = *fragments.rbegin();
595  if (td.maxFragRows - sz > chunk.length() - offset) {
596  sz += chunk.length() - offset;
597  if (i == arr_frags - 1) {
598  fragments.rbegin()->last_chunk = arr_frags - 1;
599  fragments.rbegin()->last_chunk_size =
600  c0p->chunk(arr_frags - 1)->length() - offset;
601  }
602  offset = 0;
603  i++;
604  } else {
605  frag.last_chunk = i;
606  frag.last_chunk_size = td.maxFragRows - sz;
607  offset += td.maxFragRows - sz;
608  sz = 0;
609  fragments.push_back({i, static_cast<int>(offset), 0, 0});
610  }
611  }
612  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
613  fragments.rbegin()->last_chunk_size == 0) {
614  // remove empty fragment at the end if any
615  fragments.pop_back();
616  }
617  // data comes like this - database_id, table_id, column_id, fragment_id
618  ChunkKey key{table_key.first, table_key.second, 0, 0};
619  std::array<int, 3> col_key{table_key.first, table_key.second, 0};
620 
621  tbb::task_group tg;
622 
623  for (auto& c : cols) {
624  if (c.isSystemCol) {
625  continue; // must be processed by base interface implementation
626  }
627 
628  if (cln >= num_cols) {
629  LOG(ERROR) << "Number of columns read from Arrow (" << num_cols
630  << ") mismatch CREATE TABLE request: " << cols.size();
631  break;
632  }
633 
634  auto ctype = c.columnType.get_type();
635  col_key[2] = key[2] = c.columnId;
636  auto& col = m_columns[col_key];
637  col.resize(fragments.size());
638  auto arr_col_chunked_array = table.column(cln++).get();
639 
640  if (c.columnType.is_dict_encoded_string()) {
641  auto dictDesc = const_cast<DictDescriptor*>(
642  catalog->getMetadataForDict(c.columnType.get_comp_param()));
643  StringDictionary* dict = dictDesc->stringDict.get();
644  createDictionaryEncodedColumn(
645  dict, c, col, arr_col_chunked_array, tg, fragments, key, mgr);
646  } else if (ctype == kDECIMAL || ctype == kNUMERIC) {
647  tg.run([this, &c, &col, arr_col_chunked_array, &tg, &fragments, key, mgr]() {
648  switch (c.columnType.get_size()) {
649  case 2:
650  createDecimalColumn<int16_t, arrow::Int16Array>(
651  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
652  break;
653  case 4:
654  createDecimalColumn<int32_t, arrow::Int32Array>(
655  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
656  break;
657  case 8:
658  createDecimalColumn<int64_t, arrow::Int64Array>(
659  c, col, arr_col_chunked_array, tg, fragments, key, mgr);
660  break;
661  default:
662  // TODO: throw unsupported decimal type exception
663  CHECK(false);
664  break;
665  }
666  });
667  } else {
668  auto empty = arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
669  for (size_t f = 0; f < fragments.size(); f++) {
670  key[3] = f;
671  auto& frag = col[f];
672  int64_t varlen = 0;
673  frag.chunks.resize(fragments[f].last_chunk - fragments[f].first_chunk + 1);
674  for (int i = fragments[f].first_chunk, e = fragments[f].last_chunk; i <= e; i++) {
675  int size, offset;
677  fragments[f], arr_col_chunked_array->chunk(i), i, size, offset);
678  frag.offset += offset;
679  frag.sz += size;
680  frag.chunks[i - fragments[f].first_chunk] =
681  arr_col_chunked_array->chunk(i)->data();
682  auto& buffers = arr_col_chunked_array->chunk(i)->data()->buffers;
683  if (!empty) {
684  if (ctype == kTEXT) {
685  if (buffers.size() <= 2) {
686  LOG(FATAL) << "Type of column #" << cln
687  << " does not match between Arrow and description of "
688  << c.columnName;
689  }
690  auto offsets_buffer = reinterpret_cast<const uint32_t*>(buffers[1]->data());
691  varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
692  } else if (buffers.size() != 2) {
693  LOG(FATAL) << "Type of column #" << cln
694  << " does not match between Arrow and description of "
695  << c.columnName;
696  }
697  }
698  }
699 
700  // create buffer descriptors
701  if (ctype == kTEXT) {
702  auto k = key;
703  k.push_back(1);
704  {
705  auto b = mgr->createBuffer(k);
706  b->setSize(varlen);
707  b->initEncoder(c.columnType);
708  }
709  k[4] = 2;
710  {
711  auto b = mgr->createBuffer(k);
712  b->setSqlType(SQLTypeInfo(kINT, false));
713  b->setSize(frag.sz * b->getSqlType().get_size());
714  }
715  } else {
716  auto b = mgr->createBuffer(key);
717  b->initEncoder(c.columnType);
718  b->setSize(frag.sz * b->getSqlType().get_size());
719  if (!empty) {
720  size_t type_size = c.columnType.get_size();
721  tg.run([b, fr = &frag, type_size]() {
722  size_t sz = 0;
723  for (size_t i = 0; i < fr->chunks.size(); i++) {
724  auto& chunk = fr->chunks[i];
725  int offset = (i == 0) ? fr->offset : 0;
726  size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
727  : (chunk->length - offset);
728  sz += size;
729  auto data = chunk->buffers[1]->data();
730  b->getEncoder()->updateStats((const int8_t*)data + offset * type_size,
731  size);
732  }
733  });
734  }
735  b->getEncoder()->setNumElems(frag.sz);
736  }
737  }
738  if (ctype != kDECIMAL && ctype != kNUMERIC && !c.columnType.is_string()) {
739  generateNullValues(fragments, arr_col_chunked_array, c.columnType);
740  }
741  }
742  } // each col and fragment
743 
744  // wait untill all stats have been updated
745  tg.wait();
746 
747  VLOG(1) << "Created CSV backed temporary table with " << num_cols << " columns, "
748  << arr_frags << " chunks, and " << fragments.size() << " fragments.";
749 }
750 
751 template <typename T>
752 void setNullValues(const std::vector<Frag>& fragments,
753  arrow::ChunkedArray* arr_col_chunked_array) {
754  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
755  : std::numeric_limits<T>::max();
756 
757  tbb::parallel_for(
758  tbb::blocked_range<size_t>(0, fragments.size()),
759  [&](const tbb::blocked_range<size_t>& r0) {
760  for (size_t f = r0.begin(); f != r0.end(); ++f) {
761  tbb::parallel_for(
762  tbb::blocked_range<size_t>(fragments[f].first_chunk,
763  fragments[f].last_chunk + 1),
764  [&](const tbb::blocked_range<size_t>& r1) {
765  for (auto chunk_index = r1.begin(); chunk_index != r1.end();
766  ++chunk_index) {
767  auto chunk = arr_col_chunked_array->chunk(chunk_index).get();
768  if (chunk->data()->null_count == chunk->data()->length) {
769  // it means we will insert sentinel values in read function
770  continue;
771  }
772  auto data = const_cast<uint8_t*>(chunk->data()->buffers[1]->data());
773  if (data && chunk->null_bitmap()) {
774  T* dataT = reinterpret_cast<T*>(data);
775  const uint8_t* bitmap_data = chunk->null_bitmap_data();
776  const int64_t length = chunk->length();
777  const int64_t bitmap_length = chunk->null_bitmap()->size() - 1;
778  for (int64_t bitmap_idx = 0; bitmap_idx < bitmap_length;
779  ++bitmap_idx) {
780  T* res = dataT + bitmap_idx * 8;
781  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
782  auto is_null = (~bitmap_data[bitmap_idx] >> bitmap_offset) & 1;
783  auto val = is_null ? null_value : res[bitmap_offset];
784  res[bitmap_offset] = val;
785  }
786  }
787 
788  for (int64_t j = bitmap_length * 8; j < length; ++j) {
789  auto is_null = (~bitmap_data[bitmap_length] >> (j % 8)) & 1;
790  auto val = is_null ? null_value : dataT[j];
791  dataT[j] = val;
792  }
793  }
794  }
795  });
796  }
797  });
798 }
799 
800 void generateNullValues(const std::vector<Frag>& fragments,
801  arrow::ChunkedArray* arr_col_chunked_array,
802  const SQLTypeInfo& columnType) {
803  const size_t typeSize = columnType.get_size();
804  if (columnType.is_integer()) {
805  switch (typeSize) {
806  case 1:
807  setNullValues<int8_t>(fragments, arr_col_chunked_array);
808  break;
809  case 2:
810  setNullValues<int16_t>(fragments, arr_col_chunked_array);
811  break;
812  case 4:
813  setNullValues<int32_t>(fragments, arr_col_chunked_array);
814  break;
815  case 8:
816  setNullValues<int64_t>(fragments, arr_col_chunked_array);
817  break;
818  default:
819  // TODO: throw unsupported integer type exception
820  CHECK(false);
821  }
822  } else {
823  if (typeSize == 4) {
824  setNullValues<float>(fragments, arr_col_chunked_array);
825  } else {
826  setNullValues<double>(fragments, arr_col_chunked_array);
827  }
828  }
829 }
830 
831 std::string ArrowCsvForeignStorage::getType() const {
832  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
833  "(storage_type='CSV:path/to/file.csv');`\n";
834  return "CSV";
835 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
int get_precision() const
Definition: sqltypes.h:262
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
Definition: sqltypes.h:51
void createDecimalColumn(const ColumnDescriptor &c, std::vector< ArrowFragment > &col, arrow::ChunkedArray *arr_col_chunked_array, tbb::task_group &tg, const std::vector< Frag > &fragments, ChunkKey key, Data_Namespace::AbstractBufferMgr *mgr)
bool is_integer() const
Definition: sqltypes.h:419
#define LOG(tag)
Definition: Logger.h:188
void createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::vector< ArrowFragment > &col, arrow::ChunkedArray *arr_col_chunked_array, tbb::task_group &tg, const std::vector< Frag > &fragments, ChunkKey key, Data_Namespace::AbstractBufferMgr *mgr)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
std::string getType() const override
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
#define CHECK_GE(x, y)
Definition: Logger.h:210
int64_t const int32_t sz
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
void generateSentinelValues(int8_t *data, const SQLTypeInfo &columnType, size_t count)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, int i, int &size, int &offset)
std::string to_string(char const *&&v)
size_t storageEntryCount() const
void setNullValues(const std::vector< Frag > &fragments, arrow::ChunkedArray *arr_col_chunked_array)
void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
bool is_dict_encoded_string() const
Definition: sqltypes.h:444
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:117
void registerArrowCsvForeignStorage(void)
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:140
specifies the content in-memory of a row in the column metadata table
void setNulls(int8_t *data, int count)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1451
Definition: sqltypes.h:54
Definition: Importer.h:62
std::string get_type_name() const
Definition: sqltypes.h:362
bool is_null(const T &v, const SQLTypeInfo &t)
#define IS_INTEGER(T)
Definition: sqltypes.h:168
Definition: sqltypes.h:43
void generateNullValues(const std::vector< Frag > &fragments, arrow::ChunkedArray *arr_col_chunked_array, const SQLTypeInfo &columnType)
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
std::vector< int > ChunkKey
Definition: types.h:37
Descriptor for a dictionary for a string columne.
For unencoded strings.
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
Definition: sqltypes.h:47
SQLTypeInfo columnType
specifies the content in-memory of a row in the table metadata table
void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
#define VLOG(n)
Definition: Logger.h:291
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
specifies the content in-memory of a row in the table metadata table