19 #include <arrow/api.h>
20 #include <arrow/csv/reader.h>
21 #include <arrow/io/file.h>
22 #include <arrow/util/decimal.h>
23 #include <tbb/parallel_for.h>
24 #include <tbb/task_group.h>
47 std::vector<std::shared_ptr<arrow::ArrayData>>
chunks;
52 void append(
const std::vector<ForeignStorageColumnBuffer>& column_buffers)
override;
57 const size_t numBytes)
override;
61 const size_t numBytes)
override;
64 std::pair<int, int> table_key,
65 const std::string&
type,
67 const std::list<ColumnDescriptor>& cols,
68 Data_Namespace::AbstractBufferMgr* mgr,
69 const arrow::Table&
table);
74 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
79 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
81 template <
typename T,
typename ChunkType>
84 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
87 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array,
92 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array);
100 const std::shared_ptr<arrow::Array>& chunk,
107 const std::vector<std::shared_ptr<arrow::Array>>& chunks,
111 std::map<std::array<int, 3>, std::vector<ArrowFragment>>
m_columns;
115 const std::vector<Frag>& fragments,
116 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array,
118 const size_t typeSize = columnType.
get_size();
122 setNullValues<int8_t>(fragments, arr_col_chunked_array);
125 setNullValues<int16_t>(fragments, arr_col_chunked_array);
128 setNullValues<int32_t>(fragments, arr_col_chunked_array);
131 setNullValues<int64_t>(fragments, arr_col_chunked_array);
139 setNullValues<float>(fragments, arr_col_chunked_array);
141 setNullValues<double>(fragments, arr_col_chunked_array);
146 template <
typename T>
148 const std::vector<Frag>& fragments,
149 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
150 const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
151 : std::numeric_limits<T>::max();
154 tbb::blocked_range<size_t>(0, fragments.size()),
155 [&](
const tbb::blocked_range<size_t>& r0) {
156 for (
size_t f = r0.begin();
f != r0.end(); ++
f) {
158 tbb::blocked_range<size_t>(fragments[
f].first_chunk,
159 fragments[
f].last_chunk + 1),
160 [&](
const tbb::blocked_range<size_t>& r1) {
161 for (
auto chunk_index = r1.begin(); chunk_index != r1.end();
163 auto chunk = arr_col_chunked_array->chunk(chunk_index).get();
164 if (chunk->data()->null_count == chunk->data()->length) {
172 auto data =
const_cast<uint8_t*
>(chunk->data()->buffers[1]->data());
173 if (data && chunk->null_bitmap()) {
176 T* dataT =
reinterpret_cast<T*
>(data);
177 const uint8_t* bitmap_data = chunk->null_bitmap_data();
178 const int64_t length = chunk->length();
179 const int64_t bitmap_length = chunk->null_bitmap()->size() - 1;
181 for (int64_t bitmap_idx = 0; bitmap_idx < bitmap_length;
183 T*
res = dataT + bitmap_idx * 8;
184 for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
185 auto is_null = (~bitmap_data[bitmap_idx] >> bitmap_offset) & 1;
186 auto val =
is_null ? null_value : res[bitmap_offset];
187 res[bitmap_offset] = val;
191 for (int64_t
j = bitmap_length * 8;
j < length; ++
j) {
192 auto is_null = (~bitmap_data[bitmap_length] >> (
j % 8)) & 1;
193 auto val =
is_null ? null_value : dataT[
j];
203 template <
typename T>
205 T* dataT =
reinterpret_cast<T*
>(data);
206 const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
207 : std::numeric_limits<T>::max();
208 std::fill(dataT, dataT + count, null_value);
214 const size_t type_size = columnType.
get_size();
218 setNulls<int8_t>(data,
count);
221 setNulls<int16_t>(data,
count);
224 setNulls<int32_t>(data,
count);
227 setNulls<int64_t>(data,
count);
234 if (type_size == 4) {
235 setNulls<float>(data,
count);
237 setNulls<double>(data,
count);
243 const std::shared_ptr<arrow::Array>& chunk,
254 const std::vector<std::shared_ptr<arrow::Array>>& chunks,
262 arrowFrag.
offset += offset;
263 arrowFrag.
sz += size;
265 auto& buffers = chunks[
i]->data()->buffers;
268 if (buffers.size() <= 2) {
269 throw std::runtime_error(
270 "Importing fixed length arrow array as variable length column");
272 auto offsets_buffer =
reinterpret_cast<const uint32_t*
>(buffers[1]->data());
273 varlen += offsets_buffer[offset + size] - offsets_buffer[offset];
274 }
else if (buffers.size() != 2) {
275 throw std::runtime_error(
276 "Importing varialbe length arrow array as fixed length column");
285 size_t maxFragRows) {
286 std::vector<Frag> fragments;
289 fragments.push_back({0, 0, 0, 0});
290 size_t num_chunks = (size_t)array.num_chunks();
291 for (
size_t i = 0;
i < num_chunks;) {
292 auto& chunk = *array.chunk(
i);
293 auto& frag = *fragments.rbegin();
294 if (maxFragRows - sz > chunk.length() - offset) {
295 sz += chunk.length() - offset;
296 if (
i == num_chunks - 1) {
297 fragments.rbegin()->last_chunk = num_chunks - 1;
298 fragments.rbegin()->last_chunk_size =
299 array.chunk((
int)num_chunks - 1)->length() - offset;
305 frag.last_chunk_size = maxFragRows - sz;
306 offset += maxFragRows - sz;
308 fragments.push_back({
i, offset, 0, 0});
311 if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
312 fragments.rbegin()->last_chunk_size == 0) {
314 fragments.pop_back();
320 std::pair<int, int> table_key,
321 const std::string&
type,
323 const std::list<ColumnDescriptor>& cols,
324 Data_Namespace::AbstractBufferMgr* mgr,
325 const arrow::Table&
table) {
327 for (
auto& c : cols) {
328 std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
332 if (c.columnType.is_dict_encoded_string()) {
334 dictionaries[col_key] = dictDesc->
stringDict.get();
341 tbb::blocked_range(0, (
int)cols.size()),
342 [
this, &tg, &table_key, &td, mgr, &table, &cols, &dictionaries](
auto range) {
343 auto columnIter = std::next(cols.begin(), range.begin());
344 for (
auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
345 auto& c = *(columnIter++);
352 ChunkKey key{table_key.first, table_key.second, c.columnId, 0};
353 std::array<int, 3> col_key{table_key.first, table_key.second, c.columnId};
355 if (col_idx >= table.num_columns()) {
356 LOG(
ERROR) <<
"Number of columns read from Arrow (" << table.num_columns()
357 <<
") mismatch CREATE TABLE request: " << cols.size();
361 auto arr_col_chunked_array = table.column(col_idx);
362 auto column_type = c.columnType.get_type();
364 if (c.columnType.is_dict_encoded_string()) {
367 switch (arr_col_chunked_array->type()->id()) {
369 arr_col_chunked_array =
373 arr_col_chunked_array =
380 switch (c.columnType.get_size()) {
382 arr_col_chunked_array = createDecimalColumn<int16_t, arrow::Int16Array>(
383 c, arr_col_chunked_array);
386 arr_col_chunked_array = createDecimalColumn<int32_t, arrow::Int32Array>(
387 c, arr_col_chunked_array);
390 arr_col_chunked_array = createDecimalColumn<int64_t, arrow::Int64Array>(
391 c, arr_col_chunked_array);
400 arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
405 auto ctype = c.columnType.get_type();
407 col.resize(fragments.size());
409 for (
size_t f = 0;
f < fragments.size();
f++) {
412 bool is_varlen = ctype ==
kTEXT && !c.columnType.is_dict_encoded_string();
414 fragments[
f], frag, arr_col_chunked_array->chunks(), is_varlen, empty);
417 if (ctype ==
kTEXT && !c.columnType.is_dict_encoded_string()) {
421 auto b = mgr->createBuffer(
k);
423 b->initEncoder(c.columnType);
427 auto b = mgr->createBuffer(
k);
429 b->setSize(frag.sz * b->getSqlType().get_size());
432 auto b = mgr->createBuffer(key);
433 b->setSize(frag.sz * c.columnType.get_size());
434 b->initEncoder(c.columnType);
436 size_t type_size = c.columnType.get_size();
437 tg.run([b, fr = &frag, type_size]() {
439 for (
size_t i = 0;
i < fr->chunks.size();
i++) {
440 auto& chunk = fr->chunks[
i];
441 int offset = (
i == 0) ? fr->offset : 0;
442 size_t size = (
i == fr->chunks.size() - 1) ? (fr->sz - sz)
443 : (chunk->length - offset);
445 auto data = chunk->buffers[1]->data();
446 b->getEncoder()->updateStatsEncoded(
447 (
const int8_t*)data + offset * type_size, size);
451 b->getEncoder()->setNumElems(frag.sz);
455 !c.columnType.is_string()) {
466 const std::vector<ForeignStorageColumnBuffer>& column_buffers) {
473 const size_t numBytes) {
474 std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
475 auto& frag =
m_columns.at(col_key).at(chunk_key[3]);
477 CHECK(!frag.chunks.empty() || !chunk_key[3]);
478 int64_t sz = 0, copied = 0;
479 int varlen_offset = 0;
480 size_t read_size = 0;
481 for (
size_t i = 0;
i < frag.chunks.size();
i++) {
482 auto& array_data = frag.chunks[
i];
483 int offset = (
i == 0) ? frag.offset : 0;
484 size_t size = (
i == frag.chunks.size() - 1) ? (frag.sz - read_size)
485 : (array_data->length - offset);
487 arrow::Buffer* bp =
nullptr;
490 bp = array_data->buffers[1].get();
492 CHECK_GE(array_data->buffers.size(), 3UL);
494 bp = array_data->buffers[2].get();
495 }
else if (array_data->null_count != array_data->length) {
497 CHECK_GE(array_data->buffers.size(), 2UL);
498 bp = array_data->buffers[1].get();
502 if (chunk_key.size() == 5 && chunk_key[4] == 2) {
503 auto data =
reinterpret_cast<const uint32_t*
>(bp->data()) + offset;
504 auto dest_ui32 =
reinterpret_cast<uint32_t*
>(
dest);
507 sz = (size + 1) *
sizeof(uint32_t);
515 sz -=
sizeof(uint32_t);
520 varlen_offset -= data[0];
525 data + (sz /
sizeof(uint32_t)),
527 [varlen_offset](uint32_t val) {
return val + varlen_offset; });
528 varlen_offset += data[(sz /
sizeof(uint32_t)) - 1];
531 auto fixed_type =
dynamic_cast<arrow::FixedWidthType*
>(array_data->type.get());
535 bp->data() + (array_data->offset + offset) * (fixed_type->bit_width() / 8),
536 sz = size * (fixed_type->bit_width() / 8));
538 auto offsets_buffer =
539 reinterpret_cast<const uint32_t*
>(array_data->buffers[1]->data());
540 auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
541 auto string_buffer_size =
542 offsets_buffer[offset + array_data->offset + size] - string_buffer_offset;
543 std::memcpy(dest, bp->data() + string_buffer_offset, sz = string_buffer_size);
548 auto fixed_type =
dynamic_cast<arrow::FixedWidthType*
>(array_data->type.get());
550 sz = size * (fixed_type->bit_width() / 8);
564 const size_t numBytes) {
565 std::array<int, 3> col_key{chunk_key[0], chunk_key[1], chunk_key[2]};
566 auto& frag =
m_columns.at(col_key).at(chunk_key[3]);
569 if (frag.chunks.size() != 1) {
573 auto& array_data = frag.chunks[0];
574 int offset = frag.offset;
576 arrow::Buffer* bp =
nullptr;
579 bp = array_data->buffers[1].get();
581 CHECK_GE(array_data->buffers.size(), 3UL);
583 bp = array_data->buffers[2].get();
584 }
else if (array_data->null_count != array_data->length) {
586 CHECK_GE(array_data->buffers.size(), 2UL);
587 bp = array_data->buffers[1].get();
595 auto data =
reinterpret_cast<int8_t*
>(
const_cast<uint8_t*
>(bp->data()));
598 if (chunk_key.size() == 5 && chunk_key[4] == 2) {
607 auto fixed_type =
dynamic_cast<arrow::FixedWidthType*
>(array_data->type.get());
609 return data + (array_data->offset + offset) * (fixed_type->bit_width() / 8);
613 auto offsets_buffer =
reinterpret_cast<const uint32_t*
>(array_data->buffers[1]->data());
614 auto string_buffer_offset = offsets_buffer[offset + array_data->offset];
615 return data + string_buffer_offset;
618 std::shared_ptr<arrow::ChunkedArray>
622 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
624 size_t bulk_size = 0;
625 std::vector<int> offsets(arr_col_chunked_array->num_chunks());
626 for (
int i = 0;
i < arr_col_chunked_array->num_chunks();
i++) {
627 offsets[
i] = bulk_size;
628 bulk_size += arr_col_chunked_array->chunk(
i)->length();
631 std::vector<std::string_view> bulk(bulk_size);
634 tbb::blocked_range<int>(0, arr_col_chunked_array->num_chunks()),
635 [&bulk, &arr_col_chunked_array, &offsets](
const tbb::blocked_range<int>&
r) {
636 for (
int i = r.begin();
i < r.end();
i++) {
637 auto chunk = std::static_pointer_cast<arrow::StringArray>(
638 arr_col_chunked_array->chunk(
i));
639 auto offset = offsets[
i];
640 for (
int j = 0;
j < chunk->length();
j++) {
641 auto view = chunk->GetView(
j);
642 bulk[offset +
j] = std::string_view(view.data(), view.length());
647 std::shared_ptr<arrow::Buffer> indices_buf;
648 auto res = arrow::AllocateBuffer(bulk_size *
sizeof(int32_t));
650 indices_buf = std::move(
res).ValueOrDie();
651 auto raw_data =
reinterpret_cast<int*
>(indices_buf->mutable_data());
653 auto array = std::make_shared<arrow::Int32Array>(bulk_size, indices_buf);
654 return std::make_shared<arrow::ChunkedArray>(array);
660 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
663 std::vector<std::shared_ptr<arrow::Array>> converted_chunks;
664 for (
auto& chunk : arr_col_chunked_array->chunks()) {
665 auto dict_array = std::static_pointer_cast<arrow::DictionaryArray>(chunk);
666 auto values = std::static_pointer_cast<arrow::StringArray>(dict_array->dictionary());
667 std::vector<std::string_view> strings(values->length());
668 for (
int i = 0;
i < values->length();
i++) {
669 auto view = values->GetView(
i);
670 strings[
i] = std::string_view(view.data(), view.length());
673 std::static_pointer_cast<arrow::Int32Array>(dict_array->indices());
674 std::vector<int> indices_mapping(values->length());
678 std::shared_ptr<arrow::Buffer> dict_indices_buf;
679 auto res = arrow::AllocateBuffer(arrow_indices->length() *
sizeof(int32_t));
681 dict_indices_buf = std::move(
res).ValueOrDie();
682 auto raw_data =
reinterpret_cast<int32_t*
>(dict_indices_buf->mutable_data());
684 for (
int i = 0;
i < arrow_indices->length();
i++) {
685 raw_data[
i] = indices_mapping[arrow_indices->Value(
i)];
688 converted_chunks.push_back(
689 std::make_shared<arrow::Int32Array>(arrow_indices->length(), dict_indices_buf));
691 return std::make_shared<arrow::ChunkedArray>(converted_chunks);
694 template <
typename T,
typename ChunkType>
697 std::shared_ptr<arrow::ChunkedArray> arr_col_chunked_array) {
698 size_t column_size = 0;
699 std::vector<int> offsets(arr_col_chunked_array->num_chunks());
700 for (
int i = 0;
i < arr_col_chunked_array->num_chunks();
i++) {
701 offsets[
i] = column_size;
702 column_size += arr_col_chunked_array->chunk(
i)->length();
705 std::shared_ptr<arrow::Buffer> result_buffer;
708 result_buffer = std::move(
res).ValueOrDie();
710 T* buffer_data =
reinterpret_cast<T*
>(result_buffer->mutable_data());
712 tbb::blocked_range(0, arr_col_chunked_array->num_chunks()),
713 [buffer_data, &offsets, arr_col_chunked_array](
auto& range) {
714 for (
int chunk_idx = range.begin(); chunk_idx < range.end(); chunk_idx++) {
715 auto offset = offsets[chunk_idx];
716 T* chunk_buffer = buffer_data + offset;
718 auto decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(
719 arr_col_chunked_array->chunk(chunk_idx));
721 arr_col_chunked_array->null_count() == arr_col_chunked_array->length();
722 for (
int i = 0;
i < decimalArray->length();
i++) {
723 if (empty || decimalArray->null_count() == decimalArray->length() ||
724 decimalArray->IsNull(
i)) {
725 chunk_buffer[
i] = inline_int_null_value<T>();
727 arrow::Decimal128 val(decimalArray->GetValue(
i));
729 static_cast<int64_t
>(val);
734 auto array = std::make_shared<ChunkType>(column_size, result_buffer);
735 return std::make_shared<arrow::ChunkedArray>(array);
743 const std::string&
type,
745 std::list<ColumnDescriptor>& cols)
override;
747 std::pair<int, int> table_key,
748 const std::string&
type,
750 const std::list<ColumnDescriptor>& cols,
751 Data_Namespace::AbstractBufferMgr* mgr)
override;
753 std::string
getType()
const override;
757 static std::map<std::string, std::shared_ptr<arrow::Table>>
tables;
761 std::map<std::string, std::shared_ptr<arrow::Table>>();
764 using namespace arrow;
797 type.set_comp_param(
sizeof(uint32_t) * 8);
801 const auto& decimal_type =
static_cast<const arrow::DecimalType&
>(
type);
807 switch (static_cast<const arrow::TimestampType&>(type).unit()) {
808 case TimeUnit::SECOND:
810 case TimeUnit::MILLI:
812 case TimeUnit::MICRO:
818 throw std::runtime_error(type.ToString() +
" is not yet supported.");
823 const std::string&
name,
825 std::list<ColumnDescriptor>& cols) {
829 for (
auto&
field :
table->schema()->fields()) {
838 std::pair<int, int> table_key,
839 const std::string& info,
841 const std::list<ColumnDescriptor>& cols,
842 Data_Namespace::AbstractBufferMgr* mgr) {
847 LOG(
INFO) <<
"CSV backed temporary tables has been activated. Create table `with "
848 "(storage_type='CSV:path/to/file.csv');`\n";
861 fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
869 const std::string&
type,
871 std::list<ColumnDescriptor>& cols)
override;
873 std::pair<int, int> table_key,
874 const std::string&
type,
876 const std::list<ColumnDescriptor>& cols,
877 Data_Namespace::AbstractBufferMgr* mgr)
override;
879 std::string
getType()
const override;
883 const std::string&
type,
885 std::list<ColumnDescriptor>& cols) {
892 using namespace arrow;
910 return arrow::boolean();
923 return time32(TimeUnit::SECOND);
932 return timestamp(TimeUnit::SECOND);
934 return timestamp(TimeUnit::MILLI);
936 return timestamp(TimeUnit::MICRO);
938 return timestamp(TimeUnit::NANO);
940 throw std::runtime_error(
"Unsupported timestamp precision for Arrow: " +
947 throw std::runtime_error(type.
get_type_name() +
" is not supported in Arrow.");
953 std::pair<int, int> table_key,
954 const std::string& info,
956 const std::list<ColumnDescriptor>& cols,
957 Data_Namespace::AbstractBufferMgr* mgr) {
960 bool isDataframe = df_td ?
true :
false;
961 std::unique_ptr<DataframeTableDescriptor> df_td_owned;
963 df_td_owned = std::make_unique<DataframeTableDescriptor>(td);
965 df_td = df_td_owned.get();
967 auto memory_pool = arrow::default_memory_pool();
968 auto arrow_parse_options = arrow::csv::ParseOptions::Defaults();
969 arrow_parse_options.quoting =
false;
970 arrow_parse_options.escaping =
false;
971 arrow_parse_options.newlines_in_values =
false;
972 arrow_parse_options.delimiter = *df_td->
delimiter.c_str();
973 auto arrow_read_options = arrow::csv::ReadOptions::Defaults();
974 arrow_read_options.use_threads =
true;
976 arrow_read_options.block_size = 20 * 1024 * 1024;
977 arrow_read_options.autogenerate_column_names =
false;
978 arrow_read_options.skip_rows =
981 auto arrow_convert_options = arrow::csv::ConvertOptions::Defaults();
982 arrow_convert_options.check_utf8 =
false;
983 arrow_convert_options.include_columns = arrow_read_options.column_names;
984 arrow_convert_options.strings_can_be_null =
true;
986 for (
auto& c : cols) {
990 arrow_convert_options.column_types.emplace(c.columnName,
992 arrow_read_options.column_names.push_back(c.columnName);
995 std::shared_ptr<arrow::io::ReadableFile> inp;
996 auto file_result = arrow::io::ReadableFile::Open(info.c_str());
998 inp = file_result.ValueOrDie();
1000 auto table_reader_result = arrow::csv::TableReader::Make(
1001 memory_pool, inp, arrow_read_options, arrow_parse_options, arrow_convert_options);
1003 auto table_reader = table_reader_result.ValueOrDie();
1005 std::shared_ptr<arrow::Table> arrowTable;
1007 auto arrow_table_result = table_reader->Read();
1009 arrowTable = arrow_table_result.ValueOrDie();
1012 VLOG(1) <<
"Read Arrow CSV file " << info <<
" in " << time <<
"ms";
1014 arrow::Table&
table = *arrowTable.get();
1019 LOG(
INFO) <<
"CSV backed temporary tables has been activated. Create table `with "
1020 "(storage_type='CSV:path/to/file.csv');`\n";
1025 fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
std::vector< int > ChunkKey
static SQLTypeInfo getOmnisciType(const arrow::DataType &type)
void setNulls(int8_t *data, int count)
void setNullValues(const std::vector< Frag > &fragments, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
HOST DEVICE int get_size() const
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
#define ARROW_THROW_NOT_OK(s)
class for a per-database catalog. also includes metadata for the current database and the current use...
static TimeT::rep execution(F func, Args &&...args)
void registerArrowForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
HOST DEVICE int get_scale() const
std::string getType() const override
void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t numBytes) override
void generateSentinelValues(int8_t *data, const SQLTypeInfo &columnType, size_t count)
size_t first_chunk_offset
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers) override
void registerArrowCsvForeignStorage(std::shared_ptr< ForeignStorageInterface > fsi)
HOST DEVICE SQLTypes get_type() const
int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t numBytes) override
std::shared_ptr< arrow::ChunkedArray > convertArrowDictionary(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void releaseArrowTable(std::string name)
std::shared_ptr< StringDictionary > stringDict
static std::map< std::string, std::shared_ptr< arrow::Table > > tables
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
void prepareTable(const int db_id, const std::string &type, TableDescriptor &td, std::list< ColumnDescriptor > &cols) override
std::string getType() const override
DEVICE void fill(ARGS &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
std::map< std::array< int, 3 >, std::vector< ArrowFragment > > m_columns
void parseArrowTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr, const arrow::Table &table)
std::vector< std::shared_ptr< arrow::ArrayData > > chunks
void getSizeAndOffset(const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, size_t i, int &size, int &offset)
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
specifies the content in-memory of a row in the column metadata table
static std::shared_ptr< arrow::DataType > getArrowImportType(const SQLTypeInfo type)
int64_t makeFragment(const Frag &frag, ArrowFragment &arrowFrag, const std::vector< std::shared_ptr< arrow::Array >> &chunks, bool is_varlen, bool is_empty)
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
int get_precision() const
void generateNullValues(const std::vector< Frag > &fragments, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array, const SQLTypeInfo &columnType)
std::string get_type_name() const
std::shared_ptr< arrow::ChunkedArray > createDictionaryEncodedColumn(StringDictionary *dict, const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
std::vector< Frag > calculateFragmentsOffsets(const arrow::ChunkedArray &array, size_t maxFragRows)
std::shared_ptr< arrow::ChunkedArray > createDecimalColumn(const ColumnDescriptor &c, std::shared_ptr< arrow::ChunkedArray > arr_col_chunked_array)
bool is_dict_encoded_string() const
constexpr auto is_datetime(SQLTypes type)
void registerTable(Catalog_Namespace::Catalog *catalog, std::pair< int, int > table_key, const std::string &type, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols, Data_Namespace::AbstractBufferMgr *mgr) override
specifies the content in-memory of a row in the table metadata table