OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowForeignStorage.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ArrowForeignStorage.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/csv/reader.h>
21 #include <arrow/io/file.h>
22 #include <arrow/util/decimal.h>
23 #include <tbb/parallel_for.h>
24 #include <tbb/task_group.h>
25 #include <array>
26 #include <future>
27 #include <vector>
28 
32 #include "Logger/Logger.h"
34 #include "Shared/ArrowUtil.h"
35 #include "Shared/measure.h"
36 
37 struct Frag {
38  size_t first_chunk; // index of the first chunk assigned to the fragment
39  size_t first_chunk_offset; // offset from the begining of the first chunk
40  size_t last_chunk; // index of the last chunk
41  size_t last_chunk_size; // number of elements in the last chunk
42 };
43 
44 struct ArrowFragment {
45  int64_t offset{0};
46  int64_t sz{0};
47  std::vector<std::shared_ptr<arrow::ArrayData>> chunks;
48 };
49 
51  public:
52  void append(const std::vector<ForeignStorageColumnBuffer>& column_buffers) override;
53 
54  void read(const ChunkKey& chunk_key,
55  const SQLTypeInfo& sql_type,
56  int8_t* dest,
57  const size_t numBytes) override;
58 
59  int8_t* tryZeroCopy(const ChunkKey& chunk_key,
60  const SQLTypeInfo& sql_type,
61  const size_t numBytes) override;
62 
64  std::pair<int, int> table_key,
65  const std::string& type,
66  const TableDescriptor& td,
67  const std::list<ColumnDescriptor>& cols,
68  Data_Namespace::AbstractBufferMgr* mgr,
69  const arrow::Table& table);
70 
71  std::shared_ptr<arrow::ChunkedArray> createDictionaryEncodedColumn(
72  StringDictionary* dict,
73  const ColumnDescriptor& c,
74  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
75 
76  std::shared_ptr<arrow::ChunkedArray> convertArrowDictionary(
77  StringDictionary* dict,
78  const ColumnDescriptor& c,
79  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
80 
81  template <typename T, typename ChunkType>
82  std::shared_ptr<arrow::ChunkedArray> createDecimalColumn(
83  const ColumnDescriptor& c,
84  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
85 
86  void generateNullValues(const std::vector<Frag>& fragments,
87  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array,
88  const SQLTypeInfo& columnType);
89 
90  template <typename T>
91  void setNullValues(const std::vector<Frag>& fragments,
92  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
93 
94  template <typename T>
95  void setNulls(int8_t* data, int count);
96 
97  void generateSentinelValues(int8_t* data, const SQLTypeInfo& columnType, size_t count);
98 
99  void getSizeAndOffset(const Frag& frag,
100  const std::shared_ptr<arrow::Array>& chunk,
101  size_t i,
102  int& size,
103  int& offset);
104 
105  int64_t makeFragment(const Frag& frag,
106  ArrowFragment& arrowFrag,
107  const std::vector<std::shared_ptr<arrow::Array>>& chunks,
108  bool is_varlen,
109  bool is_empty);
110 
111  std::map<std::array<int, 3>, std::vector<ArrowFragment>> m_columns;
112 };
113 
115  const std::vector<Frag>& fragments,
116  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array,
117  const SQLTypeInfo& columnType) {
118  const size_t typeSize = columnType.get_size();
119  if (columnType.is_integer() || is_datetime(columnType.get_type())) {
120  switch (typeSize) {
121  case 1:
122  setNullValues<int8_t>(fragments, arr_col_chunked_array);
123  break;
124  case 2:
125  setNullValues<int16_t>(fragments, arr_col_chunked_array);
126  break;
127  case 4:
128  setNullValues<int32_t>(fragments, arr_col_chunked_array);
129  break;
130  case 8:
131  setNullValues<int64_t>(fragments, arr_col_chunked_array);
132  break;
133  default:
134  // TODO: throw unsupported integer type exception
135  CHECK(false);
136  }
137  } else {
138  if (typeSize == 4) {
139  setNullValues<float>(fragments, arr_col_chunked_array);
140  } else {
141  setNullValues<double>(fragments, arr_col_chunked_array);
142  }
143  }
144 }
145 
146 template <typename T>
148  const std::vector<Frag>& fragments,
149  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
150  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
151  : std::numeric_limits<T>::max();
152 
153  tbb::parallel_for(
154  tbb::blocked_range<size_t>(0, fragments.size()),
155  [&](const tbb::blocked_range<size_t>& r0) {
156  for (size_t f = r0.begin(); f != r0.end(); ++f) {
157  tbb::parallel_for(
158  tbb::blocked_range<size_t>(fragments[f].first_chunk,
159  fragments[f].last_chunk + 1),
160  [&](const tbb::blocked_range<size_t>& r1) {
161  for (auto chunk_index = r1.begin(); chunk_index != r1.end();
162  ++chunk_index) {
163  auto chunk = arr_col_chunked_array->chunk(chunk_index).get();
164  if (chunk->data()->null_count == chunk->data()->length) {
165  // it means we will insert sentinel values in read function
166  continue;
167  }
168  // We can not use mutable_data in case of shared access
169  // This is not realy safe, but it is the only way to do this without
170  // copiing
171  // TODO: add support for sentinel values to read_csv
172  auto data = const_cast<uint8_t*>(chunk->data()->buffers[1]->data());
173  if (data && chunk->null_bitmap()) { // TODO: to be checked and possibly
174  // reimplemented
175  // CHECK(data) << " is null";
176  T* dataT = reinterpret_cast<T*>(data);
177  const uint8_t* bitmap_data = chunk->null_bitmap_data();
178  const int64_t length = chunk->length();
179  const int64_t bitmap_length = chunk->null_bitmap()->size() - 1;
180 
181  for (int64_t bitmap_idx = 0; bitmap_idx < bitmap_length;
182  ++bitmap_idx) {
183  T* res = dataT + bitmap_idx * 8;
184  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
185  auto is_null = (~bitmap_data[bitmap_idx] >> bitmap_offset) & 1;
186  auto val = is_null ? null_value : res[bitmap_offset];
187  res[bitmap_offset] = val;
188  }
189  }
190 
191  for (int64_t j = bitmap_length * 8; j < length; ++j) {
192  auto is_null = (~bitmap_data[bitmap_length] >> (j % 8)) & 1;
193  auto val = is_null ? null_value : dataT[j];
194  dataT[j] = val;
195  }
196  }
197  }
198  });
199  }
200  });
201 }
202 
203 template <typename T>
204 void ArrowForeignStorageBase::setNulls(int8_t* data, int count) {
205  T* dataT = reinterpret_cast<T*>(data);
206  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
207  : std::numeric_limits<T>::max();
208  std::fill(dataT, dataT + count, null_value);
209 }
210 
212  const SQLTypeInfo& columnType,
213  size_t count) {
214  const size_t type_size = columnType.get_size();
215  if (columnType.is_integer() || is_datetime(columnType.get_type())) {
216  switch (type_size) {
217  case 1:
218  setNulls<int8_t>(data, count);
219  break;
220  case 2:
221  setNulls<int16_t>(data, count);
222  break;
223  case 4:
224  setNulls<int32_t>(data, count);
225  break;
226  case 8:
227  setNulls<int64_t>(data, count);
228  break;
229  default:
230  // TODO: throw unsupported integer type exception
231  CHECK(false);
232  }
233  } else {
234  if (type_size == 4) {
235  setNulls<float>(data, count);
236  } else {
237  setNulls<double>(data, count);
238  }
239  }
240 }
241 
243  const std::shared_ptr<arrow::Array>& chunk,
244  size_t i,
245  int& size,
246  int& offset) {
247  offset = (i == frag.first_chunk) ? frag.first_chunk_offset : 0;
248  size = (i == frag.last_chunk) ? frag.last_chunk_size : (chunk->length() - offset);
249 }
250 
252  const Frag& frag,
253  ArrowFragment& arrowFrag,
254  const std::vector<std::shared_ptr<arrow::Array>>& chunks,
255  bool is_varlen,
256  bool is_empty) {
257  int64_t varlen = 0;
258  arrowFrag.chunks.resize(frag.last_chunk - frag.first_chunk + 1);
259  for (int i = frag.first_chunk, e = frag.last_chunk; i <= e; i++) {
260  int size, offset;
261  getSizeAndOffset(frag, chunks[i], i, size, offset);
262  arrowFrag.offset += offset;
263  arrowFrag.sz += size;
264  arrowFrag.chunks[i - frag.first_chunk] = chunks[i]->data();
265  auto& buffers = chunks[i]->data()->buffers;
266  if (!is_empty) {
267  if (is_varlen) {
268  if (buffers.size() <= 2) {
269  throw std::runtime_error(
270  "Importing fixed length arrow array as variable length column");
271  }
272  auto offsets_buffer = reinterpret_cast<const uint32_t*>(buffers[1]->data());
273  varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
274  } else if (buffers.size() != 2) {
275  throw std::runtime_error(
276  "Importing varialbe length arrow array as fixed length column");
277  }
278  }
279  }
280  // return length of string buffer if array is none encoded string
281  return varlen;
282 }
283 
284 std::vector<Frag> calculateFragmentsOffsets(const arrow::ChunkedArray& array,
285  size_t maxFragRows) {
286  std::vector<Frag> fragments;
287  size_t sz = 0;
288  size_t offset = 0;
289  fragments.push_back({0, 0, 0, 0});
290  size_t num_chunks = (size_t)array.num_chunks();
291  for (size_t i = 0; i < num_chunks;) {
292  auto& chunk = *array.chunk(i);
293  auto& frag = *fragments.rbegin();
294  if (maxFragRows - sz > chunk.length() - offset) {
295  sz += chunk.length() - offset;
296  if (i == num_chunks - 1) {
297  fragments.rbegin()->last_chunk = num_chunks - 1;
298  fragments.rbegin()->last_chunk_size =
299  array.chunk((int)num_chunks - 1)->length() - offset;
300  }
301  offset = 0;
302  i++;
303  } else {
304  frag.last_chunk = i;
305  frag.last_chunk_size = maxFragRows - sz;
306  offset += maxFragRows - sz;
307  sz = 0;
308  fragments.push_back({i, offset, 0, 0});
309  }
310  }
311  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
312  fragments.rbegin()->last_chunk_size == 0) {
313  // remove empty fragment at the end if any
314  fragments.pop_back();
315  }
316  return fragments;
317 }
318 
320  std::pair<int, int> table_key,
321  const std::string& type,
322  const TableDescriptor& td,
323  const std::list<ColumnDescriptor>& cols,
324  Data_Namespace::AbstractBufferMgr* mgr,
325  const arrow::Table& table) {
326  std::map<std::array<int, 3>, StringDictionary*> dictionaries;
327  for (auto& c : cols) {
328  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
329  m_columns[col_key] = {};
330  // fsi registerTable runs under SqliteLock which does not allow invoking
331  // getMetadataForDict in other threads
332  if (c.columnType.is_dict_encoded_string()) {
333  auto dictDesc = catalog->getMetadataForDict(c.columnType.get_comp_param());
334  dictionaries[col_key] = dictDesc->stringDict.get();
335  }
336  }
337 
338  tbb::task_group tg;
339 
340  tbb::parallel_for(
341  tbb::blocked_range(0, (int)cols.size()),
342  [this, &tg, &table_key, &td, mgr, &table, &cols, &dictionaries](auto range) {
343  auto columnIter = std::next(cols.begin(), range.begin());
344  for (auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
345  auto& c = *(columnIter++);
346 
347  if (c.isSystemCol) {
348  continue; // must be processed by base interface implementation
349  }
350 
351  // data comes like this - database_id, table_id, column_id, fragment_id
352  ChunkKey key{table_key.first, table_key.second, c.columnId, 0};
353  std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
354 
355  if (col_idx >= table.num_columns()) {
356  LOG(ERROR) << "Number of columns read from Arrow (" << table.num_columns()
357  << ") mismatch CREATE TABLE request: " << cols.size();
358  break;
359  }
360 
361  auto arr_col_chunked_array = table.column(col_idx);
362  auto column_type = c.columnType.get_type();
363 
364  if (c.columnType.is_dict_encoded_string()) {
365  StringDictionary* dict = dictionaries[col_key];
366 
367  switch (arr_col_chunked_array->type()->id()) {
368  case arrow::Type::STRING:
369  arr_col_chunked_array =
370  createDictionaryEncodedColumn(dict, c, arr_col_chunked_array);
371  break;
373  arr_col_chunked_array =
374  convertArrowDictionary(dict, c, arr_col_chunked_array);
375  break;
376  default:
377  CHECK(false);
378  }
379  } else if (column_type == kDECIMAL || column_type == kNUMERIC) {
380  switch (c.columnType.get_size()) {
381  case 2:
382  arr_col_chunked_array = createDecimalColumn<int16_t, arrow::Int16Array>(
383  c, arr_col_chunked_array);
384  break;
385  case 4:
386  arr_col_chunked_array = createDecimalColumn<int32_t, arrow::Int32Array>(
387  c, arr_col_chunked_array);
388  break;
389  case 8:
390  arr_col_chunked_array = createDecimalColumn<int64_t, arrow::Int64Array>(
391  c, arr_col_chunked_array);
392  break;
393  default:
394  // TODO: throw unsupported decimal type exception
395  CHECK(false);
396  break;
397  }
398  }
399  auto empty =
400  arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
401 
402  auto fragments =
403  calculateFragmentsOffsets(*arr_col_chunked_array, td.maxFragRows);
404 
405  auto ctype = c.columnType.get_type();
406  auto& col = m_columns[col_key];
407  col.resize(fragments.size());
408 
409  for (size_t f = 0; f < fragments.size(); f++) {
410  key[3] = f;
411  auto& frag = col[f];
412  bool is_varlen = ctype == kTEXT && !c.columnType.is_dict_encoded_string();
413  size_t varlen = makeFragment(
414  fragments[f], frag, arr_col_chunked_array->chunks(), is_varlen, empty);
415 
416  // create buffer descriptors
417  if (ctype == kTEXT && !c.columnType.is_dict_encoded_string()) {
418  auto k = key;
419  k.push_back(1);
420  {
421  auto b = mgr->createBuffer(k);
422  b->setSize(varlen);
423  b->initEncoder(c.columnType);
424  }
425  k[4] = 2;
426  {
427  auto b = mgr->createBuffer(k);
428  b->setSqlType(SQLTypeInfo(kINT, false));
429  b->setSize(frag.sz * b->getSqlType().get_size());
430  }
431  } else {
432  auto b = mgr->createBuffer(key);
433  b->setSize(frag.sz * c.columnType.get_size());
434  b->initEncoder(c.columnType);
435  if (!empty) {
436  size_t type_size = c.columnType.get_size();
437  tg.run([b, fr = &frag, type_size]() {
438  size_t sz = 0;
439  for (size_t i = 0; i < fr->chunks.size(); i++) {
440  auto& chunk = fr->chunks[i];
441  int offset = (i == 0) ? fr->offset : 0;
442  size_t size = (i == fr->chunks.size() - 1) ? (fr->sz - sz)
443  : (chunk->length - offset);
444  sz += size;
445  auto data = chunk->buffers[1]->data();
446  b->getEncoder()->updateStatsEncoded(
447  (const int8_t*)data + offset * type_size, size);
448  }
449  });
450  }
451  b->getEncoder()->setNumElems(frag.sz);
452  }
453  }
454  if (column_type != kDECIMAL && column_type != kNUMERIC &&
455  !c.columnType.is_string()) {
456  generateNullValues(fragments, arr_col_chunked_array, c.columnType);
457  }
458  }
459  }); // each col and fragment
460 
461  // wait untill all stats have been updated
462  tg.wait();
463 }
464 
466  const std::vector<ForeignStorageColumnBuffer>& column_buffers) {
467  CHECK(false);
468 }
469 
471  const SQLTypeInfo& sql_type,
472  int8_t* dest,
473  const size_t numBytes) {
474  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
475  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
476 
477  CHECK(!frag.chunks.empty() || !chunk_key[3]);
478  int64_t sz = 0, copied = 0;
479  int varlen_offset = 0;
480  size_t read_size = 0;
481  for (size_t i = 0; i < frag.chunks.size(); i++) {
482  auto& array_data = frag.chunks[i];
483  int offset = (i == 0) ? frag.offset : 0;
484  size_t size = (i == frag.chunks.size() - 1) ? (frag.sz - read_size)
485  : (array_data->length - offset);
486  read_size += size;
487  arrow::Buffer* bp = nullptr;
488  if (sql_type.is_dict_encoded_string()) {
489  // array_data->buffers[1] stores dictionary indexes
490  bp = array_data->buffers[1].get();
491  } else if (sql_type.get_type() == kTEXT) {
492  CHECK_GE(array_data->buffers.size(), 3UL);
493  // array_data->buffers[2] stores string array
494  bp = array_data->buffers[2].get();
495  } else if (array_data->null_count != array_data->length) {
496  // any type except strings (none encoded strings offsets go here as well)
497  CHECK_GE(array_data->buffers.size(), 2UL);
498  bp = array_data->buffers[1].get();
499  }
500  if (bp) {
501  // offset buffer for none encoded strings need to be merged
502  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
503  auto data = reinterpret_cast<const uint32_t*>(bp->data()) + offset;
504  auto dest_ui32 = reinterpret_cast<uint32_t*>(dest);
505  // as size contains count of string in chunk slice it would always be one less
506  // then offsets array size
507  sz = (size + 1) * sizeof(uint32_t);
508  if (sz > 0) {
509  if (i != 0) {
510  // We merge arrow chunks with string offsets into a single contigous fragment.
511  // Each string is represented by a pair of offsets, thus size of offset table
512  // is num strings + 1. When merging two chunks, the last number in the first
513  // chunk duplicates the first number in the second chunk, so we skip it.
514  data++;
515  sz -= sizeof(uint32_t);
516  } else {
517  // As we support cases when fragment starts with offset of arrow chunk we need
518  // to substract the first element of the first chunk from all elements in that
519  // fragment
520  varlen_offset -= data[0];
521  }
522  // We also re-calculate offsets in the second chunk as it is a continuation of
523  // the first one.
524  std::transform(data,
525  data + (sz / sizeof(uint32_t)),
526  dest_ui32,
527  [varlen_offset](uint32_t val) { return val + varlen_offset; });
528  varlen_offset += data[(sz / sizeof(uint32_t)) - 1];
529  }
530  } else {
531  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
532  if (fixed_type) {
533  std::memcpy(
534  dest,
535  bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
536  sz = size * (fixed_type->bit_width() / 8));
537  } else {
538  auto offsets_buffer =
539  reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
540  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
541  auto string_buffer_size =
542  offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
543  std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
544  }
545  }
546  } else {
547  // TODO: nullify?
548  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
549  if (fixed_type) {
550  sz = size * (fixed_type->bit_width() / 8);
551  generateSentinelValues(dest, sql_type, size);
552  } else {
553  CHECK(false); // TODO: what's else???
554  }
555  }
556  dest += sz;
557  copied += sz;
558  }
559  CHECK_EQ(numBytes, size_t(copied));
560 }
561 
563  const SQLTypeInfo& sql_type,
564  const size_t numBytes) {
565  std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
566  auto& frag = m_columns.at(col_key).at(chunk_key[3]);
567 
568  // fragment should be continious to allow zero copy
569  if (frag.chunks.size() != 1) {
570  return nullptr;
571  }
572 
573  auto& array_data = frag.chunks[0];
574  int offset = frag.offset;
575 
576  arrow::Buffer* bp = nullptr;
577  if (sql_type.is_dict_encoded_string()) {
578  // array_data->buffers[1] stores dictionary indexes
579  bp = array_data->buffers[1].get();
580  } else if (sql_type.get_type() == kTEXT) {
581  CHECK_GE(array_data->buffers.size(), 3UL);
582  // array_data->buffers[2] stores string array
583  bp = array_data->buffers[2].get();
584  } else if (array_data->null_count != array_data->length) {
585  // any type except strings (none encoded strings offsets go here as well)
586  CHECK_GE(array_data->buffers.size(), 2UL);
587  bp = array_data->buffers[1].get();
588  }
589 
590  // arrow buffer is empty, it means we should fill fragment with null's in read function
591  if (!bp) {
592  return nullptr;
593  }
594 
595  auto data = reinterpret_cast<int8_t*>(const_cast<uint8_t*>(bp->data()));
596 
597  // if buffer is null encoded string index buffer
598  if (chunk_key.size() == 5 && chunk_key[4] == 2) {
599  // if offset != 0 we need to recalculate index buffer by adding offset to each index
600  if (offset != 0) {
601  return nullptr;
602  } else {
603  return data;
604  }
605  }
606 
607  auto fixed_type = dynamic_cast<arrow::FixedWidthType*>(array_data->type.get());
608  if (fixed_type) {
609  return data + (array_data->offset + offset) * (fixed_type->bit_width() / 8);
610  }
611  // if buffer is none encoded string data buffer
612  // then we should find it's offset in offset buffer
613  auto offsets_buffer = reinterpret_cast<const uint32_t*>(array_data->buffers[1]->data());
614  auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
615  return data + string_buffer_offset;
616 }
617 
618 std::shared_ptr<arrow::ChunkedArray>
620  StringDictionary* dict,
621  const ColumnDescriptor& c,
622  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
623  // calculate offsets for every fragment in bulk
624  size_t bulk_size = 0;
625  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
626  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
627  offsets[i] = bulk_size;
628  bulk_size += arr_col_chunked_array->chunk(i)->length();
629  }
630 
631  std::vector<std::string_view> bulk(bulk_size);
632 
633  tbb::parallel_for(
634  tbb::blocked_range<int>(0, arr_col_chunked_array->num_chunks()),
635  [&bulk, &arr_col_chunked_array, &offsets](const tbb::blocked_range<int>& r) {
636  for (int i = r.begin(); i < r.end(); i++) {
637  auto chunk = std::static_pointer_cast<arrow::StringArray>(
638  arr_col_chunked_array->chunk(i));
639  auto offset = offsets[i];
640  for (int j = 0; j < chunk->length(); j++) {
641  auto view = chunk->GetView(j);
642  bulk[offset + j] = std::string_view(view.data(), view.length());
643  }
644  }
645  });
646 
647  std::shared_ptr<arrow::Buffer> indices_buf;
648  auto res = arrow::AllocateBuffer(bulk_size * sizeof(int32_t));
649  CHECK(res.ok());
650  indices_buf = std::move(res).ValueOrDie();
651  auto raw_data = reinterpret_cast<int*>(indices_buf->mutable_data());
652  dict->getOrAddBulk(bulk, raw_data);
653  auto array = std::make_shared<arrow::Int32Array>(bulk_size, indices_buf);
654  return std::make_shared<arrow::ChunkedArray>(array);
655 }
656 
657 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::convertArrowDictionary(
658  StringDictionary* dict,
659  const ColumnDescriptor& c,
660  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
661  // TODO: allocate one big array and split it by fragments as it is done in
662  // createDictionaryEncodedColumn
663  std::vector<std::shared_ptr<arrow::Array>> converted_chunks;
664  for (auto& chunk : arr_col_chunked_array->chunks()) {
665  auto dict_array = std::static_pointer_cast<arrow::DictionaryArray>(chunk);
666  auto values = std::static_pointer_cast<arrow::StringArray>(dict_array->dictionary());
667  std::vector<std::string_view> strings(values->length());
668  for (int i = 0; i < values->length(); i++) {
669  auto view = values->GetView(i);
670  strings[i] = std::string_view(view.data(), view.length());
671  }
672  auto arrow_indices =
673  std::static_pointer_cast<arrow::Int32Array>(dict_array->indices());
674  std::vector<int> indices_mapping(values->length());
675  dict->getOrAddBulk(strings, indices_mapping.data());
676 
677  // create new arrow chunk with remapped indices
678  std::shared_ptr<arrow::Buffer> dict_indices_buf;
679  auto res = arrow::AllocateBuffer(arrow_indices->length() * sizeof(int32_t));
680  CHECK(res.ok());
681  dict_indices_buf = std::move(res).ValueOrDie();
682  auto raw_data = reinterpret_cast<int32_t*>(dict_indices_buf->mutable_data());
683 
684  for (int i = 0; i < arrow_indices->length(); i++) {
685  raw_data[i] = indices_mapping[arrow_indices->Value(i)];
686  }
687 
688  converted_chunks.push_back(
689  std::make_shared<arrow::Int32Array>(arrow_indices->length(), dict_indices_buf));
690  }
691  return std::make_shared<arrow::ChunkedArray>(converted_chunks);
692 }
693 
694 template <typename T, typename ChunkType>
695 std::shared_ptr<arrow::ChunkedArray> ArrowForeignStorageBase::createDecimalColumn(
696  const ColumnDescriptor& c,
697  std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
698  size_t column_size = 0;
699  std::vector<int> offsets(arr_col_chunked_array->num_chunks());
700  for (int i = 0; i < arr_col_chunked_array->num_chunks(); i++) {
701  offsets[i] = column_size;
702  column_size += arr_col_chunked_array->chunk(i)->length();
703  }
704 
705  std::shared_ptr<arrow::Buffer> result_buffer;
706  auto res = arrow::AllocateBuffer(column_size * c.columnType.get_size());
707  CHECK(res.ok());
708  result_buffer = std::move(res).ValueOrDie();
709 
710  T* buffer_data = reinterpret_cast<T*>(result_buffer->mutable_data());
711  tbb::parallel_for(
712  tbb::blocked_range(0, arr_col_chunked_array->num_chunks()),
713  [buffer_data, &offsets, arr_col_chunked_array](auto& range) {
714  for (int chunk_idx = range.begin(); chunk_idx < range.end(); chunk_idx++) {
715  auto offset = offsets[chunk_idx];
716  T* chunk_buffer = buffer_data + offset;
717 
718  auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
719  arr_col_chunked_array->chunk(chunk_idx));
720  auto empty =
721  arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
722  for (int i = 0; i < decimalArray->length(); i++) {
723  if (empty || decimalArray->null_count() == decimalArray->length() ||
724  decimalArray->IsNull(i)) {
725  chunk_buffer[i] = inline_int_null_value<T>();
726  } else {
727  arrow::Decimal128 val(decimalArray->GetValue(i));
728  chunk_buffer[i] =
729  static_cast<int64_t>(val); // arrow can cast only to int64_t
730  }
731  }
732  }
733  });
734  auto array = std::make_shared<ChunkType>(column_size, result_buffer);
735  return std::make_shared<arrow::ChunkedArray>(array);
736 }
737 
739  public:
741 
742  void prepareTable(const int db_id,
743  const std::string& type,
744  TableDescriptor& td,
745  std::list<ColumnDescriptor>& cols) override;
747  std::pair<int, int> table_key,
748  const std::string& type,
749  const TableDescriptor& td,
750  const std::list<ColumnDescriptor>& cols,
751  Data_Namespace::AbstractBufferMgr* mgr) override;
752 
753  std::string getType() const override;
754 
755  std::string name;
756 
757  static std::map<std::string, std::shared_ptr<arrow::Table>> tables;
758 };
759 
760 std::map<std::string, std::shared_ptr<arrow::Table>> ArrowForeignStorage::tables =
761  std::map<std::string, std::shared_ptr<arrow::Table>>();
762 
763 static SQLTypeInfo getOmnisciType(const arrow::DataType& type) {
764  using namespace arrow;
765  switch (type.id()) {
766  case Type::INT8:
767  return SQLTypeInfo(kTINYINT, false);
768  case Type::INT16:
769  return SQLTypeInfo(kSMALLINT, false);
770  case Type::INT32:
771  return SQLTypeInfo(kINT, false);
772  case Type::INT64:
773  return SQLTypeInfo(kBIGINT, false);
774  case Type::BOOL:
775  return SQLTypeInfo(kBOOLEAN, false);
776  case Type::FLOAT:
777  return SQLTypeInfo(kFLOAT, false);
778  case Type::DOUBLE:
779  return SQLTypeInfo(kDOUBLE, false);
780  // uncomment when arrow 2.0 will be released and modin support for dictionary types
781  // in read_csv would be implemented
782 
783  // case Type::DICTIONARY: {
784  // auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
785  // // this is needed because createTable forces type.size to be equal to
786  // // comp_param / 8, no matter what type.size you set here
787  // type.set_comp_param(sizeof(uint32_t) * 8);
788  // return type;
789  // }
790  // case Type::STRING:
791  // return SQLTypeInfo(kTEXT, false, kENCODING_NONE);
792 
793  case Type::STRING: {
794  auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
795  // this is needed because createTable forces type.size to be equal to
796  // comp_param / 8, no matter what type.size you set here
797  type.set_comp_param(sizeof(uint32_t) * 8);
798  return type;
799  }
800  case Type::DECIMAL: {
801  const auto& decimal_type = static_cast<const arrow::DecimalType&>(type);
802  return SQLTypeInfo(kDECIMAL, decimal_type.precision(), decimal_type.scale(), false);
803  }
804  case Type::TIME32:
805  return SQLTypeInfo(kTIME, false);
806  case Type::TIMESTAMP:
807  switch (static_cast<const arrow::TimestampType&>(type).unit()) {
808  case TimeUnit::SECOND:
809  return SQLTypeInfo(kTIMESTAMP, 0, 0);
810  case TimeUnit::MILLI:
811  return SQLTypeInfo(kTIMESTAMP, 3, 0);
812  case TimeUnit::MICRO:
813  return SQLTypeInfo(kTIMESTAMP, 6, 0);
814  case TimeUnit::NANO:
815  return SQLTypeInfo(kTIMESTAMP, 9, 0);
816  }
817  default:
818  throw std::runtime_error(type.ToString() + " is not yet supported.");
819  }
820 }
821 
823  const std::string& name,
824  TableDescriptor& td,
825  std::list<ColumnDescriptor>& cols) {
826  td.hasDeletedCol = false;
827  this->name = name;
828  auto table = tables[name];
829  for (auto& field : table->schema()->fields()) {
830  ColumnDescriptor cd;
831  cd.columnName = field->name();
832  cd.columnType = getOmnisciType(*field->type());
833  cols.push_back(cd);
834  }
835 }
836 
838  std::pair<int, int> table_key,
839  const std::string& info,
840  const TableDescriptor& td,
841  const std::list<ColumnDescriptor>& cols,
842  Data_Namespace::AbstractBufferMgr* mgr) {
843  parseArrowTable(catalog, table_key, info, td, cols, mgr, *(tables[name].get()));
844 }
845 
846 std::string ArrowForeignStorage::getType() const {
847  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
848  "(storage_type='CSV:path/to/file.csv');`\n";
849  return "ARROW";
850 }
851 
852 void setArrowTable(std::string name, std::shared_ptr<arrow::Table> table) {
854 }
855 
856 void releaseArrowTable(std::string name) {
857  ArrowForeignStorage::tables.erase(name);
858 }
859 
860 void registerArrowForeignStorage(std::shared_ptr<ForeignStorageInterface> fsi) {
861  fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
862 }
863 
865  public:
867 
868  void prepareTable(const int db_id,
869  const std::string& type,
870  TableDescriptor& td,
871  std::list<ColumnDescriptor>& cols) override;
873  std::pair<int, int> table_key,
874  const std::string& type,
875  const TableDescriptor& td,
876  const std::list<ColumnDescriptor>& cols,
877  Data_Namespace::AbstractBufferMgr* mgr) override;
878 
879  std::string getType() const override;
880 };
881 
883  const std::string& type,
884  TableDescriptor& td,
885  std::list<ColumnDescriptor>& cols) {
886  td.hasDeletedCol = false;
887 }
888 
889 // TODO: this overlaps with getArrowType() from ArrowResultSetConverter.cpp but with few
890 // differences in kTEXT and kDATE
891 static std::shared_ptr<arrow::DataType> getArrowImportType(const SQLTypeInfo type) {
892  using namespace arrow;
893  auto ktype = type.get_type();
894  if (IS_INTEGER(ktype)) {
895  switch (type.get_size()) {
896  case 1:
897  return int8();
898  case 2:
899  return int16();
900  case 4:
901  return int32();
902  case 8:
903  return int64();
904  default:
905  CHECK(false);
906  }
907  }
908  switch (ktype) {
909  case kBOOLEAN:
910  return arrow::boolean();
911  case kFLOAT:
912  return float32();
913  case kDOUBLE:
914  return float64();
915  case kCHAR:
916  case kVARCHAR:
917  case kTEXT:
918  return utf8();
919  case kDECIMAL:
920  case kNUMERIC:
921  return decimal(type.get_precision(), type.get_scale());
922  case kTIME:
923  return time32(TimeUnit::SECOND);
924  // case kDATE:
925  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
926  // Currently support for date32() is missing in cuDF.Hence, if client requests for
927  // date on GPU, return date64() for the time being, till support is added.
928  // return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
929  case kTIMESTAMP:
930  switch (type.get_precision()) {
931  case 0:
932  return timestamp(TimeUnit::SECOND);
933  case 3:
934  return timestamp(TimeUnit::MILLI);
935  case 6:
936  return timestamp(TimeUnit::MICRO);
937  case 9:
938  return timestamp(TimeUnit::NANO);
939  default:
940  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
941  std::to_string(type.get_precision()));
942  }
943  case kARRAY:
944  case kINTERVAL_DAY_TIME:
946  default:
947  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
948  }
949  return nullptr;
950 }
951 
953  std::pair<int, int> table_key,
954  const std::string& info,
955  const TableDescriptor& td,
956  const std::list<ColumnDescriptor>& cols,
957  Data_Namespace::AbstractBufferMgr* mgr) {
958  const DataframeTableDescriptor* df_td =
959  dynamic_cast<const DataframeTableDescriptor*>(&td);
960  bool isDataframe = df_td ? true : false;
961  std::unique_ptr<DataframeTableDescriptor> df_td_owned;
962  if (!isDataframe) {
963  df_td_owned = std::make_unique<DataframeTableDescriptor>(td);
964  CHECK(df_td_owned);
965  df_td = df_td_owned.get();
966  }
967  auto memory_pool = arrow::default_memory_pool();
968  auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
969  arrow_parse_options.quoting = false;
970  arrow_parse_options.escaping = false;
971  arrow_parse_options.newlines_in_values = false;
972  arrow_parse_options.delimiter = *df_td->delimiter.c_str();
973  auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
974  arrow_read_options.use_threads = true;
975 
976  arrow_read_options.block_size = 20 * 1024 * 1024;
977  arrow_read_options.autogenerate_column_names = false;
978  arrow_read_options.skip_rows =
979  df_td->hasHeader ? (df_td->skipRows + 1) : df_td->skipRows;
980 
981  auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
982  arrow_convert_options.check_utf8 = false;
983  arrow_convert_options.include_columns = arrow_read_options.column_names;
984  arrow_convert_options.strings_can_be_null = true;
985 
986  for (auto& c : cols) {
987  if (c.isSystemCol) {
988  continue; // must be processed by base interface implementation
989  }
990  arrow_convert_options.column_types.emplace(c.columnName,
991  getArrowImportType(c.columnType));
992  arrow_read_options.column_names.push_back(c.columnName);
993  }
994 
995  std::shared_ptr<arrow::io::ReadableFile> inp;
996  auto file_result = arrow::io::ReadableFile::Open(info.c_str());
997  ARROW_THROW_NOT_OK(file_result.status());
998  inp = file_result.ValueOrDie();
999 
1000  auto table_reader_result = arrow::csv::TableReader::Make(
1001  memory_pool, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
1002  ARROW_THROW_NOT_OK(table_reader_result.status());
1003  auto table_reader = table_reader_result.ValueOrDie();
1004 
1005  std::shared_ptr<arrow::Table> arrowTable;
1006  auto time = measure<>::execution([&]() {
1007  auto arrow_table_result = table_reader->Read();
1008  ARROW_THROW_NOT_OK(arrow_table_result.status());
1009  arrowTable = arrow_table_result.ValueOrDie();
1010  });
1011 
1012  VLOG(1) << "Read Arrow CSV file " << info << " in " << time << "ms";
1013 
1014  arrow::Table& table = *arrowTable.get();
1015  parseArrowTable(catalog, table_key, info, td, cols, mgr, table);
1016 }
1017 
1018 std::string ArrowCsvForeignStorage::getType() const {
1019  LOG(INFO) << "CSV backed temporary tables has been activated. Create table `with "
1020  "(storage_type='CSV:path/to/file.csv');`\n";
1021  return "CSV";
1022 }
1023 
1024 void registerArrowCsvForeignStorage(std::shared_ptr<ForeignStorageInterface> fsi) {
1025  fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
1026 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
static SQLTypeInfo getOmnisciType(const arrow::DataType &type)
void setNulls(int8_t *data, int count)
void setNullValues(const std::vector< Frag > &fragments, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
Definition: sqltypes.h:48
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
void registerArrowForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
#define LOG(tag)
Definition: Logger.h:188
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
std::string getType() const override
#define DOUBLE
string name
Definition: setup.in.py:62
tuple r
Definition: test_fsi.py:16
void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
void generateSentinelValues(int8_t *data, const SQLTypeInfo &columnType, size_t count)
#define CHECK_GE(x, y)
Definition: Logger.h:210
size_t first_chunk_offset
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
#define DICTIONARY
void registerArrowCsvForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t numBytes) override
std::shared_ptr< arrow::ChunkedArray > convertArrowDictionary(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void releaseArrowTable(std::string name)
std::string to_string(char const *&&v)
std::shared_ptr< StringDictionary > stringDict
static std::map< std::string, std::shared_ptr< arrow::Table > > tables
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
std::string getType() const override
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
CONSTEXPR DEVICE bool is_null(const T &value)
bool is_integer() const
Definition: sqltypes.h:490
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
int count
void parseArrowTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr, const arrow::Table &table)
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1444
specifies the content in-memory of a row in the column metadata table
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
int64_t makeFragment(const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen, bool is_empty)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
int get_precision() const
Definition: sqltypes.h:317
Definition: sqltypes.h:51
size_t last_chunk
#define TIMESTAMP
void generateNullValues(const std::vector< Frag > &fragments, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array, const SQLTypeInfo &columnType)
std::string get_type_name() const
Definition: sqltypes.h:417
#define IS_INTEGER(T)
Definition: sqltypes.h:239
Definition: sqltypes.h:40
size_t last_chunk_size
std::shared_ptr< arrow::ChunkedArray > createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
#define STRING
std::vector< Frag > calculateFragmentsOffsets(const arrow::ChunkedArray &array, size_t maxFragRows)
#define CHECK(condition)
Definition: Logger.h:197
size_t first_chunk
#define DECIMAL
std::shared_ptr< arrow::ChunkedArray > createDecimalColumn(const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
For unencoded strings.
char * f
bool is_dict_encoded_string() const
Definition: sqltypes.h:525
Definition: sqltypes.h:44
SQLTypeInfo columnType
#define FLOAT
std::string columnName
#define VLOG(n)
Definition: Logger.h:291
constexpr auto is_datetime(SQLTypes type)
Definition: sqltypes.h:257
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
specifies the content in-memory of a row in the table metadata table