OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
22 
23 #ifndef _MSC_VER
24 #include <sys/ipc.h>
25 #include <sys/shm.h>
26 #include <sys/types.h>
27 #else
28 // IPC shared memory not yet supported on windows
29 using key_t = size_t;
30 #define IPC_PRIVATE 0
31 #endif
32 
33 #include <algorithm>
34 #include <cerrno>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <future>
38 #include <string>
39 
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
43 
44 #include "Shared/ArrowUtil.h"
45 
46 #ifdef HAVE_CUDA
47 #include <arrow/gpu/cuda_api.h>
48 #include <cuda.h>
49 #endif // HAVE_CUDA
50 
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
52 
53 #define ARROW_CONVERTER_DEBUG true
54 
55 #define ARROW_LOG(category) \
56  VLOG(1) << "[Arrow]" \
57  << "[" << category "] "
58 
59 namespace {
60 
61 /* We can create Arrow buffers which refer memory owned by ResultSet.
62  For safe memory access we should keep a ResultSetPtr to keep
63  data live while buffer lives. Use this custom buffer for that. */
64 class ResultSetBuffer : public arrow::Buffer {
65  public:
66  ResultSetBuffer(const uint8_t* buf, size_t size, ResultSetPtr rs)
67  : arrow::Buffer(buf, size), _rs(rs) {}
68 
69  private:
71 };
72 
75  switch (ti.get_size()) {
76  case 1:
77  return kTINYINT;
78  case 2:
79  return kSMALLINT;
80  case 4:
81  return kINT;
82  case 8:
83  return kBIGINT;
84  default:
85  CHECK(false);
86  }
87  return ti.get_type();
88 }
89 
91  auto logical_type = ti.get_type();
92  if (IS_INTEGER(logical_type)) {
93  switch (ti.get_size()) {
94  case 1:
95  return kTINYINT;
96  case 2:
97  return kSMALLINT;
98  case 4:
99  return kINT;
100  case 8:
101  return kBIGINT;
102  default:
103  CHECK(false);
104  }
105  }
106  return logical_type;
107 }
108 
109 template <typename TYPE, typename VALUE_ARRAY_TYPE>
111  std::shared_ptr<ValueArray>& values,
112  const size_t max_size) {
113  if (!values) {
114  values = std::make_shared<ValueArray>(std::vector<TYPE>());
115  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
116  }
117  CHECK(values);
118  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
119  CHECK(values_ty);
120 
121  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
122  CHECK(pval_cty);
123  if constexpr (std::is_same_v<VALUE_ARRAY_TYPE, NullableString>) {
124  if (auto str = boost::get<std::string>(pval_cty)) {
125  values_ty->push_back(*str);
126  } else {
127  values_ty->push_back("");
128  }
129  } else {
130  auto val_ty = static_cast<TYPE>(*pval_cty);
131  values_ty->push_back(val_ty);
132  }
133 }
134 
135 template <typename TYPE, typename VALUE_ARRAY_TYPE>
137  std::shared_ptr<ValueArray>& values,
138  const size_t max_size) {
139  if (!values) {
140  values = std::make_shared<ValueArray>(Vec2<TYPE>());
141  boost::get<Vec2<TYPE>>(*values).reserve(max_size);
142  }
143  CHECK(values);
144 
145  Vec2<TYPE>* values_ty = boost::get<Vec2<TYPE>>(values.get());
146  CHECK(values_ty);
147 
148  values_ty->emplace_back(std::vector<TYPE>{});
149 
150  if (val_ctys) {
151  for (auto val_cty : val_ctys.value()) {
152  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
153  CHECK(pval_cty);
154  values_ty->back().emplace_back(static_cast<TYPE>(*pval_cty));
155  }
156  }
157 }
158 
160  const SQLTypeInfo& col_type,
161  std::shared_ptr<std::vector<bool>>& null_bitmap,
162  const size_t max_size) {
163  if (col_type.get_notnull()) {
164  CHECK(!null_bitmap);
165  return;
166  }
167 
168  if (!null_bitmap) {
169  null_bitmap = std::make_shared<std::vector<bool>>();
170  null_bitmap->reserve(max_size);
171  }
172  CHECK(null_bitmap);
173  null_bitmap->push_back(value ? true : false);
174 }
175 
176 template <typename TYPE>
178  const SQLTypeInfo& col_type,
179  std::shared_ptr<std::vector<bool>>& null_bitmap,
180  const size_t max_size) {
181  if (col_type.get_notnull()) {
182  CHECK(!null_bitmap);
183  return;
184  }
185  auto pvalue = boost::get<TYPE>(&value);
186  CHECK(pvalue);
187  bool is_valid = false;
188  if constexpr (std::is_same_v<TYPE, NullableString>) {
189  is_valid = boost::get<std::string>(pvalue) != nullptr;
190  } else {
191  if (col_type.is_boolean()) {
192  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
193  } else if (col_type.is_dict_encoded_string()) {
194  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
195  } else if (col_type.is_integer() || col_type.is_time() || col_type.is_decimal()) {
196  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
197  } else if (col_type.is_fp()) {
198  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
199  } else {
200  UNREACHABLE();
201  }
202  }
203 
204  if (!null_bitmap) {
205  null_bitmap = std::make_shared<std::vector<bool>>();
206  null_bitmap->reserve(max_size);
207  }
208  CHECK(null_bitmap);
209  null_bitmap->push_back(is_valid);
210 }
211 
212 template <typename TYPE, typename enable = void>
213 class null_type {};
214 
215 template <typename TYPE>
216 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
218  static constexpr type value = inline_int_null_value<type>();
219 };
220 
221 template <typename TYPE>
222 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
223  using type = TYPE;
224  static constexpr type value = inline_fp_null_value<type>();
225 };
226 
227 template <typename TYPE>
229 
230 template <typename C_TYPE,
231  typename ARROW_TYPE = typename arrow::CTypeTraits<C_TYPE>::ArrowType>
233  size_t col,
234  size_t entry_count,
235  std::shared_ptr<arrow::Array>& out) {
236  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
237 
238  std::shared_ptr<arrow::Buffer> values;
239  std::shared_ptr<arrow::Buffer> is_valid;
240  const int64_t buf_size = entry_count * sizeof(C_TYPE);
241  if (result->isZeroCopyColumnarConversionPossible(col)) {
242  values.reset(new ResultSetBuffer(
243  reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
244  buf_size,
245  result));
246  } else {
247  auto res = arrow::AllocateBuffer(buf_size);
248  CHECK(res.ok());
249  values = std::move(res).ValueOrDie();
250  result->copyColumnIntoBuffer(
251  col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
252  }
253 
254  int64_t null_count = 0;
255  auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
256  CHECK(res.ok());
257  is_valid = std::move(res).ValueOrDie();
258 
259  auto is_valid_data = is_valid->mutable_data();
260 
261  const null_type_t<C_TYPE>* vals =
262  reinterpret_cast<const null_type_t<C_TYPE>*>(values->data());
264 
265  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
266  for (size_t i = 0; i < unroll_count; i += 8) {
267  uint8_t valid_byte = 0;
268  uint8_t valid;
269  valid = vals[i + 0] != null_val;
270  valid_byte |= valid << 0;
271  null_count += !valid;
272  valid = vals[i + 1] != null_val;
273  valid_byte |= valid << 1;
274  null_count += !valid;
275  valid = vals[i + 2] != null_val;
276  valid_byte |= valid << 2;
277  null_count += !valid;
278  valid = vals[i + 3] != null_val;
279  valid_byte |= valid << 3;
280  null_count += !valid;
281  valid = vals[i + 4] != null_val;
282  valid_byte |= valid << 4;
283  null_count += !valid;
284  valid = vals[i + 5] != null_val;
285  valid_byte |= valid << 5;
286  null_count += !valid;
287  valid = vals[i + 6] != null_val;
288  valid_byte |= valid << 6;
289  null_count += !valid;
290  valid = vals[i + 7] != null_val;
291  valid_byte |= valid << 7;
292  null_count += !valid;
293  is_valid_data[i >> 3] = valid_byte;
294  }
295  if (unroll_count != entry_count) {
296  uint8_t valid_byte = 0;
297  for (size_t i = unroll_count; i < entry_count; ++i) {
298  bool valid = vals[i] != null_val;
299  valid_byte |= valid << (i & 7);
300  null_count += !valid;
301  }
302  is_valid_data[unroll_count >> 3] = valid_byte;
303  }
304 
305  if (!null_count) {
306  is_valid.reset();
307  }
308 
309  // TODO: support date/time + scaling
310  // TODO: support booleans
311  if (null_count) {
312  out.reset(
313  new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
314  } else {
315  out.reset(new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
316  }
317 }
318 
319 #ifndef _MSC_VER
320 std::pair<key_t, void*> get_shm(size_t shmsz) {
321  if (!shmsz) {
322  return std::make_pair(IPC_PRIVATE, nullptr);
323  }
324  // Generate a new key for a shared memory segment. Keys to shared memory segments
325  // are OS global, so we need to try a new key if we encounter a collision. It seems
326  // incremental keygen would be deterministically worst-case. If we use a hash
327  // (like djb2) + nonce, we could still get collisions if multiple clients specify
328  // the same nonce, so using rand() in lieu of a better approach
329  // TODO(ptaylor): Is this common? Are these assumptions true?
330  auto key = static_cast<key_t>(rand());
331  int shmid = -1;
332  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
333  // exist IPC_EXCL - ensures failure if a segment already exists for this key
334  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
335  // If shmget fails and errno is one of these four values, try a new key.
336  // TODO(ptaylor): is checking for the last three values really necessary? Checking
337  // them by default to be safe. EEXIST - a shared memory segment is already associated
338  // with this key EACCES - a shared memory segment is already associated with this key,
339  // but we don't have permission to access it EINVAL - a shared memory segment is
340  // already associated with this key, but the size is less than shmsz ENOENT -
341  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
342  // was found
343  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
344  throw std::runtime_error("failed to create a shared memory");
345  }
346  key = static_cast<key_t>(rand());
347  }
348  // get a pointer to the shared memory segment
349  auto ipc_ptr = shmat(shmid, NULL, 0);
350  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
351  throw std::runtime_error("failed to attach a shared memory");
352  }
353 
354  return std::make_pair(key, ipc_ptr);
355 }
356 #endif
357 
358 std::pair<key_t, std::shared_ptr<arrow::Buffer>> get_shm_buffer(size_t size) {
359 #ifdef _MSC_VER
360  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
361  return std::make_pair(0, nullptr);
362 #else
363  auto [key, ipc_ptr] = get_shm(size);
364  std::shared_ptr<arrow::Buffer> buffer(
365  new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
366  return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
367  std::move(buffer));
368 #endif
369 }
370 
372  const std::vector<uint8_t>& bitmap,
373  std::vector<int64_t>& vec1d) {
374  /*
375  remap negative values if ArrowStringRemapMode == ONLY_TRANSIENT_STRINGS_REMAPPED or
376  everything if ALL_STRINGS_REMAPPED
377  */
378 
379  auto all_strings_remapped_bitmap = [&column_builder, &vec1d, &bitmap]() {
380  for (size_t i = 0; i < vec1d.size(); i++) {
381  if (bitmap[i]) {
382  vec1d[i] = column_builder.string_remapping.at(vec1d[i]);
383  }
384  }
385  };
386 
387  auto all_strings_remapped = [&column_builder, &vec1d]() {
388  for (size_t i = 0; i < vec1d.size(); i++) {
389  vec1d[i] = column_builder.string_remapping.at(vec1d[i]);
390  }
391  };
392 
393  auto only_transient_strings_remapped = [&column_builder, &vec1d]() {
394  for (size_t i = 0; i < vec1d.size(); i++) {
395  if (vec1d[i] < 0) {
396  vec1d[i] = column_builder.string_remapping.at(vec1d[i]);
397  }
398  }
399  };
400 
401  auto only_transient_strings_remapped_bitmap = [&column_builder, &vec1d, &bitmap]() {
402  for (size_t i = 0; i < vec1d.size(); i++) {
403  if (bitmap[i] && vec1d[i] < 0) {
404  vec1d[i] = column_builder.string_remapping.at(vec1d[i]);
405  }
406  }
407  };
408 
409  switch (column_builder.string_remap_mode) {
411  bitmap.empty() ? all_strings_remapped() : all_strings_remapped_bitmap();
412  break;
414  bitmap.empty() ? only_transient_strings_remapped()
415  : only_transient_strings_remapped_bitmap();
416  break;
417  default:
418  UNREACHABLE();
419  }
420 }
421 
422 } // namespace
423 
424 namespace arrow {
425 
426 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
427 #ifdef _MSC_VER
428  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
429 #else
430  auto [key, ipc_ptr] = get_shm(data->size());
431  // copy the arrow records buffer to shared memory
432  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
433  // write directly to the shared memory segment as a sink
434  memcpy(ipc_ptr, data->data(), data->size());
435  // detach from the shared memory segment
436  shmdt(ipc_ptr);
437  return key;
438 #endif
439 }
440 
441 } // namespace arrow
442 
447  auto timer = DEBUG_TIMER(__func__);
448  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
449 
450  struct BuildResultParams {
451  int64_t schemaSize() const {
452  return serialized_schema ? serialized_schema->size() : 0;
453  };
454  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
455  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
456  bool hasRecordBatch() const { return records_size > 0; }
457  bool hasDict() const { return dictSize() > 0; }
458 
459  int64_t records_size{0};
460  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
461  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
462  } result_params;
463 
466  const auto getWireResult = [&]() -> ArrowResult {
467  auto timer = DEBUG_TIMER("serialize batch to wire");
468  const auto total_size = result_params.totalSize();
469  std::vector<char> record_handle_data(total_size);
470  auto serialized_records =
471  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
472 
473  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
474 
475  ARROW_THROW_NOT_OK(writer->Write(
476  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
477  result_params.schemaSize()));
478 
479  if (result_params.hasDict()) {
480  ARROW_THROW_NOT_OK(writer->Write(
481  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
482  result_params.dictSize()));
483  }
484 
485  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
486  serialized_records, result_params.schemaSize() + result_params.dictSize()));
487 
488  if (result_params.hasRecordBatch()) {
489  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
490  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
491  }
492 
493  return {std::vector<char>(0),
494  0,
495  std::vector<char>(0),
496  serialized_records->size(),
497  std::string{""},
498  std::move(record_handle_data)};
499  };
500 
501  const auto getShmResult = [&]() -> ArrowResult {
502  auto timer = DEBUG_TIMER("serialize batch to shared memory");
503  std::shared_ptr<arrow::Buffer> serialized_records;
504  std::vector<char> schema_handle_buffer;
505  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
506  key_t records_shm_key = IPC_PRIVATE;
507  const int64_t total_size = result_params.totalSize();
508 
509  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
510 
511  memcpy(serialized_records->mutable_data(),
512  result_params.serialized_schema->data(),
513  (size_t)result_params.schemaSize());
514 
515  if (result_params.hasDict()) {
516  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
517  result_params.serialized_dict->data(),
518  (size_t)result_params.dictSize());
519  }
520 
521  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
522  serialized_records, result_params.schemaSize() + result_params.dictSize()));
523 
524  if (result_params.hasRecordBatch()) {
525  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
526  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
527  }
528 
529  memcpy(&record_handle_buffer[0],
530  reinterpret_cast<const unsigned char*>(&records_shm_key),
531  sizeof(key_t));
532 
533  return {schema_handle_buffer,
534  0,
535  record_handle_buffer,
536  serialized_records->size(),
537  std::string{""}};
538  };
539 
540  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
541  auto options = arrow::ipc::IpcWriteOptions::Defaults();
542  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
543 
544  // If our record batch is going to be empty, we omit it entirely,
545  // only serializing the schema.
546  if (!record_batch->num_rows()) {
547  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
548  arrow::ipc::SerializeSchema(*record_batch->schema(),
549  arrow::default_memory_pool()));
550 
551  switch (transport_method_) {
553  return getWireResult();
555  return getShmResult();
556  default:
557  UNREACHABLE();
558  }
559  }
560 
561  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
562 
563  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
564 
565  for (auto& pair : dictionaries) {
566  arrow::ipc::IpcPayload payload;
567  int64_t dictionary_id = pair.first;
568  const auto& dictionary = pair.second;
569 
571  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
572  int32_t metadata_length = 0;
574  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
575  }
576  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
577 
578  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
579  arrow::ipc::SerializeSchema(*record_batch->schema(),
580  arrow::default_memory_pool()));
581 
583  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
584 
585  switch (transport_method_) {
587  return getWireResult();
589  return getShmResult();
590  default:
591  UNREACHABLE();
592  }
593  }
594 #ifdef HAVE_CUDA
596 
597  // Copy the schema to the schema handle
598  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
599  ARROW_THROW_NOT_OK(out_stream_result.status());
600  auto out_stream = std::move(out_stream_result).ValueOrDie();
601 
602  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
603  arrow::ipc::DictionaryMemo current_memo;
604  arrow::ipc::DictionaryMemo serialized_memo;
605 
606  arrow::ipc::IpcPayload schema_payload;
607  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
608  arrow::ipc::IpcWriteOptions::Defaults(),
609  mapper,
610  &schema_payload));
611  int32_t schema_payload_length = 0;
612  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
613  arrow::ipc::IpcWriteOptions::Defaults(),
614  out_stream.get(),
615  &schema_payload_length));
616  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
617  ARROW_LOG("GPU") << "Dictionary "
618  << "found dicts: " << dictionaries.size();
619 
621  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
622 
623  // now try a dictionary
624  std::shared_ptr<arrow::Schema> dummy_schema;
625  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
626 
627  for (const auto& pair : dictionaries) {
628  arrow::ipc::IpcPayload payload;
629  const auto& dict_id = pair.first;
630  CHECK_GE(dict_id, 0);
631  ARROW_LOG("GPU") << "Dictionary "
632  << "dict_id: " << dict_id;
633  const auto& dict = pair.second;
634  CHECK(dict);
635 
636  if (!dummy_schema) {
637  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
638  dummy_schema = std::make_shared<arrow::Schema>(
639  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
640  }
641  dict_batches.emplace_back(
642  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
643  }
644 
645  if (!dict_batches.empty()) {
646  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
647  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
648  }
649 
650  auto complete_ipc_stream = out_stream->Finish();
651  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
652  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
653 
654  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
655  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
656  memcpy(&schema_record_key_buffer[0],
657  reinterpret_cast<const unsigned char*>(&record_key),
658  sizeof(key_t));
659 
660  arrow::cuda::CudaDeviceManager* manager;
661  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
662  std::shared_ptr<arrow::cuda::CudaContext> context;
663  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
664 
665  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
666  ARROW_ASSIGN_OR_THROW(device_serialized,
667  SerializeRecordBatch(*record_batch, context.get()));
668 
669  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
670  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
671 
672  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
673  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
674  cuda_handle->Serialize(arrow::default_memory_pool()));
675 
676  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
677  memcpy(&record_handle_buffer[0],
678  serialized_cuda_handle->data(),
679  serialized_cuda_handle->size());
680 
681  return {schema_record_key_buffer,
682  serialized_records->size(),
683  record_handle_buffer,
684  serialized_cuda_handle->size(),
685  serialized_cuda_handle->ToString()};
686 #else
687  UNREACHABLE();
688  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
689 #endif
690 }
691 
694  arrow::ipc::DictionaryFieldMapper* mapper) const {
695  auto timer = DEBUG_TIMER(__func__);
696  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
697  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
698 
700  serialized_schema,
701  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
702 
703  if (arrow_copy->num_rows()) {
704  auto timer = DEBUG_TIMER("serialize records");
705  ARROW_THROW_NOT_OK(arrow_copy->Validate());
706  ARROW_ASSIGN_OR_THROW(serialized_records,
707  arrow::ipc::SerializeRecordBatch(
708  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
709  } else {
710  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
711  }
712  return {serialized_schema, serialized_records};
713 }
714 
715 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
716  auto timer = DEBUG_TIMER(__func__);
717  const auto col_count = results_->colCount();
718  std::vector<std::shared_ptr<arrow::Field>> fields;
719  CHECK(col_names_.empty() || col_names_.size() == col_count);
720  for (size_t i = 0; i < col_count; ++i) {
721  const auto ti = results_->getColType(i);
722  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
723  }
724 #if ARROW_CONVERTER_DEBUG
725  VLOG(1) << "Arrow fields: ";
726  for (const auto& f : fields) {
727  VLOG(1) << "\t" << f->ToString(true);
728  }
729 #endif
730  return getArrowBatch(arrow::schema(fields));
731 }
732 
733 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
734  const std::shared_ptr<arrow::Schema>& schema) const {
735  std::vector<std::shared_ptr<arrow::Array>> result_columns;
736 
737  // First, check if the result set is empty.
738  // If so, we return an arrow result set that only
739  // contains the schema (no record batch will be serialized).
740  if (results_->isEmpty()) {
741  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
742  }
743 
744  const size_t entry_count = top_n_ < 0
745  ? results_->entryCount()
746  : std::min(size_t(top_n_), results_->entryCount());
747 
748  const auto col_count = results_->colCount();
749  size_t row_count = 0;
750 
751  result_columns.resize(col_count);
752  std::vector<ColumnBuilder> builders(col_count);
753 
754  // Create array builders
755  for (size_t i = 0; i < col_count; ++i) {
756  initializeColumnBuilder(builders[i], results_->getColType(i), i, schema->field(i));
757  }
758 
759  // TODO(miyu): speed up for columnar buffers
760  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
761  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
762  const std::vector<bool>& non_lazy_cols,
763  const size_t start_entry,
764  const size_t end_entry) -> size_t {
765  CHECK_EQ(value_seg.size(), col_count);
766  CHECK_EQ(null_bitmap_seg.size(), col_count);
767  const auto local_entry_count = end_entry - start_entry;
768  size_t seg_row_count = 0;
769  for (size_t i = start_entry; i < end_entry; ++i) {
770  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
771  if (row.empty()) {
772  continue;
773  }
774  ++seg_row_count;
775  for (size_t j = 0; j < col_count; ++j) {
776  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
777  continue;
778  }
779 
780  if (auto scalar_value = boost::get<ScalarTargetValue>(&row[j])) {
781  // TODO(miyu): support more types other than scalar.
782  CHECK(scalar_value);
783  const auto& column = builders[j];
784  switch (column.physical_type) {
785  case kBOOLEAN:
786  create_or_append_value<bool, int64_t>(
787  *scalar_value, value_seg[j], local_entry_count);
788  create_or_append_validity<int64_t>(
789  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
790  break;
791  case kTINYINT:
792  create_or_append_value<int8_t, int64_t>(
793  *scalar_value, value_seg[j], local_entry_count);
794  create_or_append_validity<int64_t>(
795  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
796  break;
797  case kSMALLINT:
798  create_or_append_value<int16_t, int64_t>(
799  *scalar_value, value_seg[j], local_entry_count);
800  create_or_append_validity<int64_t>(
801  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
802  break;
803  case kINT:
804  create_or_append_value<int32_t, int64_t>(
805  *scalar_value, value_seg[j], local_entry_count);
806  create_or_append_validity<int64_t>(
807  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
808  break;
809  case kBIGINT:
810  create_or_append_value<int64_t, int64_t>(
811  *scalar_value, value_seg[j], local_entry_count);
812  create_or_append_validity<int64_t>(
813  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
814  break;
815  case kDECIMAL:
816  create_or_append_value<int64_t, int64_t>(
817  *scalar_value, value_seg[j], local_entry_count);
818  create_or_append_validity<int64_t>(
819  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
820  break;
821  case kFLOAT:
822  create_or_append_value<float, float>(
823  *scalar_value, value_seg[j], local_entry_count);
824  create_or_append_validity<float>(
825  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
826  break;
827  case kDOUBLE:
828  create_or_append_value<double, double>(
829  *scalar_value, value_seg[j], local_entry_count);
830  create_or_append_validity<double>(
831  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
832  break;
833  case kTIME:
834  create_or_append_value<int32_t, int64_t>(
835  *scalar_value, value_seg[j], local_entry_count);
836  create_or_append_validity<int64_t>(
837  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
838  break;
839  case kDATE:
841  ? create_or_append_value<int64_t, int64_t>(
842  *scalar_value, value_seg[j], local_entry_count)
843  : create_or_append_value<int32_t, int64_t>(
844  *scalar_value, value_seg[j], local_entry_count);
845  create_or_append_validity<int64_t>(
846  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
847  break;
848  case kTIMESTAMP:
849  create_or_append_value<int64_t, int64_t>(
850  *scalar_value, value_seg[j], local_entry_count);
851  create_or_append_validity<int64_t>(
852  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
853  break;
854  case kTEXT:
855  create_or_append_value<std::string, NullableString>(
856  *scalar_value, value_seg[j], local_entry_count);
857  create_or_append_validity<NullableString>(
858  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
859  break;
860  default:
861  // TODO(miyu): support more scalar types.
862  throw std::runtime_error(column.col_type.get_type_name() +
863  " is not supported in Arrow result sets.");
864  }
865  } else if (auto array = boost::get<ArrayTargetValue>(&row[j])) {
866  // array := Boost::optional<std::vector<ScalarTargetValue>>
867  const auto& column = builders[j];
868  switch (column.col_type.get_subtype()) {
869  case kBOOLEAN:
870  create_or_append_value<int8_t, int64_t>(
871  *array, value_seg[j], local_entry_count);
873  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
874  break;
875  case kTINYINT:
876  create_or_append_value<int8_t, int64_t>(
877  *array, value_seg[j], local_entry_count);
879  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
880  break;
881  case kSMALLINT:
882  create_or_append_value<int16_t, int64_t>(
883  *array, value_seg[j], local_entry_count);
885  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
886  break;
887  case kINT:
888  create_or_append_value<int32_t, int64_t>(
889  *array, value_seg[j], local_entry_count);
891  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
892  break;
893  case kBIGINT:
894  create_or_append_value<int64_t, int64_t>(
895  *array, value_seg[j], local_entry_count);
897  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
898  break;
899  case kFLOAT:
900  create_or_append_value<float, float>(
901  *array, value_seg[j], local_entry_count);
903  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
904  break;
905  case kDOUBLE:
906  create_or_append_value<double, double>(
907  *array, value_seg[j], local_entry_count);
909  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
910  break;
911  case kTEXT:
912  if (column.col_type.is_dict_encoded_type()) {
913  create_or_append_value<int64_t, int64_t>(
914  *array, value_seg[j], local_entry_count);
916  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
917  break;
918  }
919  default:
920  throw std::runtime_error(column.col_type.get_type_name() +
921  " is not supported in Arrow result sets.");
922  }
923  }
924  }
925  }
926  return seg_row_count;
927  };
928 
929  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
930  const std::vector<bool>& non_lazy_cols,
931  const size_t start_col,
932  const size_t end_col) {
933  for (size_t col = start_col; col < end_col; ++col) {
934  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
935  continue;
936  }
937 
938  const auto& column = builders[col];
939  switch (column.physical_type) {
940  case kTINYINT:
941  convert_column<int8_t>(results_, col, entry_count, result[col]);
942  break;
943  case kSMALLINT:
944  convert_column<int16_t>(results_, col, entry_count, result[col]);
945  break;
946  case kINT:
947  convert_column<int32_t>(results_, col, entry_count, result[col]);
948  break;
949  case kBIGINT:
950  convert_column<int64_t>(results_, col, entry_count, result[col]);
951  break;
952  case kFLOAT:
953  convert_column<float>(results_, col, entry_count, result[col]);
954  break;
955  case kDOUBLE:
956  convert_column<double>(results_, col, entry_count, result[col]);
957  break;
958  default:
959  throw std::runtime_error(column.col_type.get_type_name() +
960  " is not supported in Arrow column converter.");
961  }
962  }
963  };
964 
965  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
966  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
967  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
968  // Don't believe we ever output directly from a table function, but this
969  // might be possible with a future query plan optimization
970  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
971  (results_->getQueryMemDesc().getQueryDescriptionType() ==
973  results_->getQueryMemDesc().getQueryDescriptionType() ==
975  entry_count == results_->entryCount();
976  std::vector<bool> non_lazy_cols;
977  if (use_columnar_converter) {
978  auto timer = DEBUG_TIMER("columnar converter");
979  std::vector<size_t> non_lazy_col_pos;
980  size_t non_lazy_col_count = 0;
981  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
982 
983  non_lazy_cols.reserve(col_count);
984  non_lazy_col_pos.reserve(col_count);
985  for (size_t i = 0; i < col_count; ++i) {
986  bool is_lazy =
987  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
988  // Currently column converter cannot handle some data types.
989  // Treat them as lazy.
990  switch (builders[i].physical_type) {
991  case kBOOLEAN:
992  case kTIME:
993  case kDATE:
994  case kTIMESTAMP:
995  is_lazy = true;
996  break;
997  default:
998  break;
999  }
1000  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
1001  is_lazy = true;
1002  }
1003  non_lazy_cols.emplace_back(!is_lazy);
1004  if (!is_lazy) {
1005  ++non_lazy_col_count;
1006  non_lazy_col_pos.emplace_back(i);
1007  }
1008  }
1009 
1010  if (non_lazy_col_count == col_count) {
1011  non_lazy_cols.clear();
1012  non_lazy_col_pos.clear();
1013  } else {
1014  non_lazy_col_pos.emplace_back(col_count);
1015  }
1016 
1017  std::vector<std::future<void>> child_threads;
1018  size_t num_threads =
1019  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
1020 
1021  size_t start_col = 0;
1022  size_t end_col = 0;
1023  for (size_t i = 0; i < num_threads; ++i) {
1024  start_col = end_col;
1025  end_col = (i + 1) * non_lazy_col_count / num_threads;
1026  size_t phys_start_col =
1027  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
1028  size_t phys_end_col =
1029  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
1030  child_threads.push_back(std::async(std::launch::async,
1031  convert_columns,
1032  std::ref(result_columns),
1033  non_lazy_cols,
1034  phys_start_col,
1035  phys_end_col));
1036  }
1037  for (auto& child : child_threads) {
1038  child.get();
1039  }
1040  row_count = entry_count;
1041  }
1042  if (!use_columnar_converter || !non_lazy_cols.empty()) {
1043  auto timer = DEBUG_TIMER("row converter");
1044  row_count = 0;
1045  if (multithreaded) {
1046  const size_t cpu_count = cpu_threads();
1047  std::vector<std::future<size_t>> child_threads;
1048  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
1049  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
1050  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
1051  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
1052  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
1053  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
1054  ++i, start_entry += stride) {
1055  const auto end_entry = std::min(entry_count, start_entry + stride);
1056  child_threads.push_back(std::async(std::launch::async,
1057  fetch,
1058  std::ref(column_value_segs[i]),
1059  std::ref(null_bitmap_segs[i]),
1060  non_lazy_cols,
1061  start_entry,
1062  end_entry));
1063  }
1064  for (auto& child : child_threads) {
1065  row_count += child.get();
1066  }
1067  {
1068  auto timer = DEBUG_TIMER("append rows to arrow");
1069  for (int i = 0; i < schema->num_fields(); ++i) {
1070  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1071  continue;
1072  }
1073 
1074  for (size_t j = 0; j < cpu_count; ++j) {
1075  if (!column_value_segs[j][i]) {
1076  continue;
1077  }
1078  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
1079  }
1080  }
1081  }
1082  } else {
1083  row_count =
1084  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
1085  {
1086  auto timer = DEBUG_TIMER("append rows to arrow single thread");
1087  for (int i = 0; i < schema->num_fields(); ++i) {
1088  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1089  continue;
1090  }
1091 
1092  append(builders[i], *column_values[i], null_bitmaps[i]);
1093  }
1094  }
1095  }
1096 
1097  {
1098  auto timer = DEBUG_TIMER("finish builders");
1099  for (size_t i = 0; i < col_count; ++i) {
1100  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1101  continue;
1102  }
1103 
1104  result_columns[i] = finishColumnBuilder(builders[i]);
1105  }
1106  }
1107  }
1108 
1109  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
1110 }
1111 
1112 namespace {
1113 
1114 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
1115  const ExecutorDeviceType device_type) {
1116  switch (get_physical_type(sql_type)) {
1117  case kBOOLEAN:
1118  return arrow::boolean();
1119  case kTINYINT:
1120  return arrow::int8();
1121  case kSMALLINT:
1122  return arrow::int16();
1123  case kINT:
1124  return arrow::int32();
1125  case kBIGINT:
1126  return arrow::int64();
1127  case kFLOAT:
1128  return arrow::float32();
1129  case kDOUBLE:
1130  return arrow::float64();
1131  case kCHAR:
1132  case kVARCHAR:
1133  case kTEXT:
1134  if (sql_type.is_dict_encoded_string()) {
1135  auto value_type = std::make_shared<arrow::StringType>();
1136  return arrow::dictionary(arrow::int32(), value_type, false);
1137  }
1138  return arrow::utf8();
1139  case kDECIMAL:
1140  case kNUMERIC:
1141  return arrow::decimal(sql_type.get_precision(), sql_type.get_scale());
1142  case kTIME:
1143  return time32(arrow::TimeUnit::SECOND);
1144  case kDATE: {
1145  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
1146  // Currently support for date32() is missing in cuDF.Hence, if client requests for
1147  // date on GPU, return date64() for the time being, till support is added.
1148  if (device_type == ExecutorDeviceType::GPU) {
1149  return arrow::date64();
1150  } else {
1151  return arrow::date32();
1152  }
1153  }
1154  case kTIMESTAMP:
1155  switch (sql_type.get_precision()) {
1156  case 0:
1157  return timestamp(arrow::TimeUnit::SECOND);
1158  case 3:
1159  return timestamp(arrow::TimeUnit::MILLI);
1160  case 6:
1161  return timestamp(arrow::TimeUnit::MICRO);
1162  case 9:
1163  return timestamp(arrow::TimeUnit::NANO);
1164  default:
1165  throw std::runtime_error(
1166  "Unsupported timestamp precision for Arrow result sets: " +
1167  std::to_string(sql_type.get_precision()));
1168  }
1169  case kARRAY:
1170  switch (sql_type.get_subtype()) {
1171  case kBOOLEAN:
1172  return arrow::list(arrow::boolean());
1173  case kTINYINT:
1174  return arrow::list(arrow::int8());
1175  case kSMALLINT:
1176  return arrow::list(arrow::int16());
1177  case kINT:
1178  return arrow::list(arrow::int32());
1179  case kBIGINT:
1180  return arrow::list(arrow::int64());
1181  case kFLOAT:
1182  return arrow::list(arrow::float32());
1183  case kDOUBLE:
1184  return arrow::list(arrow::float64());
1185  case kTEXT:
1186  if (sql_type.is_dict_encoded_type()) {
1187  auto value_type = std::make_shared<arrow::StringType>();
1188  return arrow::list(arrow::dictionary(arrow::int32(), value_type, false));
1189  }
1190  default:
1191  throw std::runtime_error("Unsupported array type for Arrow result sets: " +
1192  sql_type.get_type_name());
1193  }
1194  case kINTERVAL_DAY_TIME:
1195  case kINTERVAL_YEAR_MONTH:
1196  default:
1197  throw std::runtime_error(sql_type.get_type_name() +
1198  " is not supported in Arrow result sets.");
1199  }
1200  return nullptr;
1201 }
1202 
1203 } // namespace
1204 
1205 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
1206  const std::string name,
1207  const SQLTypeInfo& target_type) const {
1208  return arrow::field(
1209  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1210 }
1211 
1213  const ArrowResult& result,
1214  const ExecutorDeviceType device_type,
1215  const size_t device_id,
1216  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1217 #ifndef _MSC_VER
1218  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
1219  // Remove shared memory on sysmem
1220  if (!result.sm_handle.empty()) {
1221  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
1222  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
1223  auto shm_id = shmget(schema_key, result.sm_size, 0666);
1224  if (shm_id < 0) {
1225  throw std::runtime_error(
1226  "failed to get an valid shm ID w/ given shm key of the schema");
1227  }
1228  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1229  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
1230  std::to_string(errno) + ")");
1231  }
1232  }
1233 
1234  if (device_type == ExecutorDeviceType::CPU) {
1235  CHECK_EQ(sizeof(key_t), result.df_handle.size());
1236  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
1237  auto shm_id = shmget(df_key, result.df_size, 0666);
1238  if (shm_id < 0) {
1239  throw std::runtime_error(
1240  "failed to get an valid shm ID w/ given shm key of the data");
1241  }
1242  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1243  throw std::runtime_error("failed to deallocate Arrow data frame");
1244  }
1245  }
1246  // CUDA buffers become owned by the caller, and will automatically be freed
1247  // TODO: What if the client never takes ownership of the result? we may want to
1248  // establish a check to see if the GPU buffer still exists, and then free it.
1249 #endif
1250 }
1251 
1253  ColumnBuilder& column_builder,
1254  const SQLTypeInfo& col_type,
1255  const size_t results_col_slot_idx,
1256  const std::shared_ptr<arrow::Field>& field) const {
1257  column_builder.field = field;
1258  column_builder.col_type = col_type;
1259  column_builder.physical_type = col_type.is_dict_encoded_string()
1260  ? get_dict_index_type(col_type)
1261  : get_physical_type(col_type);
1262 
1263  auto value_type = field->type();
1264  if (col_type.is_dict_encoded_type()) {
1265  auto timer = DEBUG_TIMER("Translate string dictionary to Arrow dictionary");
1266  if (!col_type.is_array()) {
1267  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1268  }
1269  // add values to the builder
1270  const int dict_id = col_type.get_comp_param();
1271 
1272  // ResultSet::rowCount(), unlike ResultSet::entryCount(), will return
1273  // the actual number of rows in the result set, taking into account
1274  // things like any limit and offset set
1275  const size_t result_set_rows = results_->rowCount();
1276  // result_set_rows guaranteed > 0 by parent
1277  CHECK_GT(result_set_rows, 0UL);
1278 
1279  auto sdp = results_->getStringDictionaryProxy(dict_id);
1280  const size_t dictionary_proxy_entries = sdp->entryCount();
1281  const double dictionary_to_result_size_ratio =
1282  static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1283 
1284  // We are conservative with when we do a bulk dictionary fetch,
1285  // even though it is generally more efficient than dictionary unique value "plucking",
1286  // for the following reasons:
1287  // 1) The number of actual distinct dictionary values can be much lower than the
1288  // number of result rows, but without getting the expression range (and that would
1289  // only work in some cases), we don't know by how much
1290  // 2) Regardless of the effect of #1, the size of the dictionary generated via
1291  // the "pluck" method will always be at worst equal in size, and very likely
1292  // significantly smaller, than the dictionary created by the bulk dictionary
1293  // fetch method, and smaller Arrow dictionaries are always a win when it comes to
1294  // sending the Arrow results over the wire, and for lowering the processing load
1295  // for clients (which often is a web browser with a lot less compute and memory
1296  // resources than our server.)
1297 
1298  const bool do_dictionary_bulk_fetch =
1299  result_set_rows > min_result_size_for_bulk_dictionary_fetch_ &&
1300  dictionary_to_result_size_ratio <=
1302 
1303  arrow::StringBuilder str_array_builder;
1304 
1305  if (do_dictionary_bulk_fetch) {
1306  VLOG(1) << "Arrow dictionary creation: bulk copying all dictionary "
1307  << " entries for column at offset " << results_col_slot_idx << ". "
1308  << "Column has " << dictionary_proxy_entries << " string entries"
1309  << " for a result set with " << result_set_rows << " rows.";
1310  column_builder.string_remap_mode =
1312  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1313  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(str_list));
1314 
1315  // When we fetch the bulk dictionary, we need to also fetch
1316  // the transient entries only contained in the proxy.
1317  // These values are always negative (starting at -2), and so need
1318  // to be remapped to point to the corresponding entries in the Arrow
1319  // dictionary (they are placed at the end after the materialized
1320  // string entries from StringDictionary)
1321 
1322  int32_t crt_transient_id = static_cast<int32_t>(str_list.size());
1323  auto const& transient_vecmap = sdp->getTransientVector();
1324  for (unsigned index = 0; index < transient_vecmap.size(); ++index) {
1325  ARROW_THROW_NOT_OK(str_array_builder.Append(*transient_vecmap[index]));
1326  auto const old_id = StringDictionaryProxy::transientIndexToId(index);
1327  CHECK(column_builder.string_remapping
1328  .insert(std::make_pair(old_id, crt_transient_id++))
1329  .second);
1330  }
1331  } else {
1332  // Pluck unique dictionary values from ResultSet column
1333  VLOG(1) << "Arrow dictionary creation: serializing unique result set dictionary "
1334  << " entries for column at offset " << results_col_slot_idx << ". "
1335  << "Column has " << dictionary_proxy_entries << " string entries"
1336  << " for a result set with " << result_set_rows << " rows.";
1338 
1339  // ResultSet::getUniqueStringsForDictEncodedTargetCol returns a pair of two vectors,
1340  // the first of int32_t values containing the unique string ids found for
1341  // results_col_slot_idx in the result set, the second containing the associated
1342  // unique strings. Note that the unique string for a unique string id are both
1343  // placed at the same offset in their respective vectors
1344 
1345  auto unique_ids_and_strings =
1346  results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1347  const auto& unique_ids = unique_ids_and_strings.first;
1348  const auto& unique_strings = unique_ids_and_strings.second;
1349  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(unique_strings));
1350  const int32_t num_unique_strings = unique_strings.size();
1351  CHECK_EQ(num_unique_strings, unique_ids.size());
1352  // We need to remap ALL string id values given the Arrow dictionary
1353  // will have "holes", i.e. it is a sparse representation of the underlying
1354  // StringDictionary
1355  for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1356  ++unique_string_idx) {
1357  CHECK(
1358  column_builder.string_remapping
1359  .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1360  .second);
1361  }
1362  // Note we don't need to get transients from proxy as they are already handled in
1363  // ResultSet::getUniqueStringsForDictEncodedTargetCol
1364  }
1365 
1366  std::shared_ptr<arrow::StringArray> string_array;
1367  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1368 
1369  if (col_type.is_array()) {
1370  column_builder.string_array = std::move(string_array);
1371  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1372  arrow::default_memory_pool(), value_type, &column_builder.builder));
1373  } else {
1374  auto dict_builder =
1375  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1376  CHECK(dict_builder);
1377 
1378  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1379  }
1380  } else {
1381  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1382  arrow::default_memory_pool(), value_type, &column_builder.builder));
1383  }
1384 }
1385 
1386 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1387  ColumnBuilder& column_builder) const {
1388  std::shared_ptr<arrow::Array> values;
1389  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1390  return values;
1391 }
1392 
1393 namespace {
1394 
1395 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1397  const ValueArray& values,
1398  const std::shared_ptr<std::vector<bool>>& is_valid) {
1399  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1400  "Dictionary encoded string builder requires function specialization.");
1401 
1402  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1403 
1404  if (scale_epoch_values<BUILDER_TYPE>()) {
1405  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1406  auto scale_values = [&](auto epoch) {
1407  return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1409  : scale_sec_to_millisec(epoch);
1410  };
1411  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1412  }
1413 
1414  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1415  CHECK(typed_builder);
1416  if (column_builder.field->nullable()) {
1417  CHECK(is_valid.get());
1418  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1419  } else {
1420  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1421  }
1422 }
1423 
1424 template <>
1425 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1427  const ValueArray& values,
1428  const std::shared_ptr<std::vector<bool>>& is_valid) {
1429  std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1430  auto typed_builder =
1431  dynamic_cast<arrow::Decimal128Builder*>(column_builder.builder.get());
1432  CHECK(typed_builder);
1433  CHECK_EQ(is_valid->size(), vals.size());
1434  if (column_builder.field->nullable()) {
1435  CHECK(is_valid.get());
1436  for (size_t i = 0; i < vals.size(); i++) {
1437  const auto v = vals[i];
1438  const auto valid = (*is_valid)[i];
1439  if (valid) {
1440  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1441  } else {
1442  ARROW_THROW_NOT_OK(typed_builder->AppendNull());
1443  }
1444  }
1445  } else {
1446  for (const auto& v : vals) {
1447  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1448  }
1449  }
1450 }
1451 
1452 template <>
1453 void appendToColumnBuilder<arrow::StringBuilder, std::string>(
1455  const ValueArray& values,
1456  const std::shared_ptr<std::vector<bool>>& is_valid) {
1457  std::vector<std::string> vals = boost::get<std::vector<std::string>>(values);
1458  auto typed_builder = dynamic_cast<arrow::StringBuilder*>(column_builder.builder.get());
1459  CHECK(typed_builder);
1460  CHECK_EQ(is_valid->size(), vals.size());
1461 
1462  if (column_builder.field->nullable()) {
1463  CHECK(is_valid.get());
1464 
1465  // TODO: Generate this instead of the boolean bitmap
1466  std::vector<uint8_t> transformed_bitmap;
1467  transformed_bitmap.reserve(is_valid->size());
1468  std::for_each(
1469  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1470  transformed_bitmap.push_back(is_valid ? 1 : 0);
1471  });
1472  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, transformed_bitmap.data()));
1473  } else {
1474  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1475  }
1476 }
1477 
1478 template <>
1479 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1481  const ValueArray& values,
1482  const std::shared_ptr<std::vector<bool>>& is_valid) {
1483  auto typed_builder =
1484  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1485  CHECK(typed_builder);
1486 
1487  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1488  // remap negative values if ArrowStringRemapMode == ONLY_TRANSIENT_STRINGS_REMAPPED or
1489  // everything if ALL_STRINGS_REMAPPED
1490  CHECK(column_builder.string_remap_mode != ArrowStringRemapMode::INVALID);
1491  for (size_t i = 0; i < vals.size(); i++) {
1492  auto& val = vals[i];
1493  if ((column_builder.string_remap_mode == ArrowStringRemapMode::ALL_STRINGS_REMAPPED ||
1494  val < 0) &&
1495  (*is_valid)[i]) {
1496  vals[i] = column_builder.string_remapping.at(val);
1497  }
1498  }
1499 
1500  if (column_builder.field->nullable()) {
1501  CHECK(is_valid.get());
1502  // TODO(adb): Generate this instead of the boolean bitmap
1503  std::vector<uint8_t> transformed_bitmap;
1504  transformed_bitmap.reserve(is_valid->size());
1505  std::for_each(
1506  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1507  transformed_bitmap.push_back(is_valid ? 1 : 0);
1508  });
1509 
1510  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1511  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1512  } else {
1514  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1515  }
1516 }
1517 
1518 template <typename BUILDER_TYPE, typename VALUE_TYPE>
1520  const ValueArray& values,
1521  const std::shared_ptr<std::vector<bool>>& is_valid) {
1522  Vec2<VALUE_TYPE> vals = boost::get<Vec2<VALUE_TYPE>>(values);
1523  auto list_builder = dynamic_cast<arrow::ListBuilder*>(column_builder.builder.get());
1524  CHECK(list_builder);
1525 
1526  auto value_builder = static_cast<BUILDER_TYPE*>(list_builder->value_builder());
1527 
1528  if (column_builder.field->nullable()) {
1529  for (size_t i = 0; i < vals.size(); i++) {
1530  if ((*is_valid)[i]) {
1531  const auto& val = vals[i];
1532  std::vector<uint8_t> bitmap(val.size());
1533  std::transform(val.begin(), val.end(), bitmap.begin(), [](VALUE_TYPE pvalue) {
1534  return static_cast<VALUE_TYPE>(pvalue) != null_type<VALUE_TYPE>::value;
1535  });
1536  ARROW_THROW_NOT_OK(list_builder->Append());
1537  if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1538  std::vector<uint8_t> bval(val.size());
1539  std::copy(val.begin(), val.end(), bval.begin());
1541  value_builder->AppendValues(bval.data(), bval.size(), bitmap.data()));
1542  } else {
1544  value_builder->AppendValues(val.data(), val.size(), bitmap.data()));
1545  }
1546  } else {
1547  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1548  }
1549  }
1550  } else {
1551  for (size_t i = 0; i < vals.size(); i++) {
1552  if ((*is_valid)[i]) {
1553  const auto& val = vals[i];
1554  ARROW_THROW_NOT_OK(list_builder->Append());
1555  if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1556  std::vector<uint8_t> bval(val.size());
1557  std::copy(val.begin(), val.end(), bval.begin());
1558  ARROW_THROW_NOT_OK(value_builder->AppendValues(bval.data(), bval.size()));
1559  } else {
1560  ARROW_THROW_NOT_OK(value_builder->AppendValues(val.data(), val.size()));
1561  }
1562  } else {
1563  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1564  }
1565  }
1566  }
1567 }
1568 
1569 template <>
1570 void appendToListColumnBuilder<arrow::StringDictionaryBuilder, int64_t>(
1572  const ValueArray& values,
1573  const std::shared_ptr<std::vector<bool>>& is_valid) {
1574  Vec2<int64_t> vec2d = boost::get<Vec2<int64_t>>(values);
1575 
1576  auto* list_builder = dynamic_cast<arrow::ListBuilder*>(column_builder.builder.get());
1577  CHECK(list_builder);
1578 
1579  // todo: fix value_builder being a StringDictionaryBuilder and not
1580  // StringDictionary32Builder
1581  auto* value_builder =
1582  dynamic_cast<arrow::StringDictionaryBuilder*>(list_builder->value_builder());
1583  CHECK(value_builder);
1584 
1585  if (column_builder.field->nullable()) {
1586  for (size_t i = 0; i < vec2d.size(); i++) {
1587  if ((*is_valid)[i]) {
1588  auto& vec1d = vec2d[i];
1589  std::vector<uint8_t> bitmap(vec1d.size());
1590  std::transform(vec1d.begin(), vec1d.end(), bitmap.begin(), [](int64_t pvalue) {
1591  return pvalue != null_type<int32_t>::value;
1592  });
1593  ARROW_THROW_NOT_OK(list_builder->Append());
1594  ARROW_THROW_NOT_OK(value_builder->InsertMemoValues(*column_builder.string_array));
1595  remap_string_values(column_builder, bitmap, vec1d);
1596  ARROW_THROW_NOT_OK(value_builder->AppendIndices(
1597  vec1d.data(), static_cast<int64_t>(vec1d.size()), bitmap.data()));
1598  } else {
1599  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1600  }
1601  }
1602  } else {
1603  for (size_t i = 0; i < vec2d.size(); i++) {
1604  if ((*is_valid)[i]) {
1605  auto& vec1d = vec2d[i];
1606  ARROW_THROW_NOT_OK(list_builder->Append());
1607  remap_string_values(column_builder, {}, vec1d);
1608  ARROW_THROW_NOT_OK(value_builder->AppendIndices(vec1d.data(), vec1d.size()));
1609  } else {
1610  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1611  }
1612  }
1613  }
1614 }
1615 
1616 } // namespace
1617 
1619  ColumnBuilder& column_builder,
1620  const ValueArray& values,
1621  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1622  if (column_builder.col_type.is_dict_encoded_string()) {
1623  CHECK_EQ(column_builder.physical_type,
1624  kINT); // assume all dicts use none-encoded type for now
1625  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1626  column_builder, values, is_valid);
1627  return;
1628  }
1629  switch (column_builder.physical_type) {
1630  case kBOOLEAN:
1631  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1632  column_builder, values, is_valid);
1633  break;
1634  case kTINYINT:
1635  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1636  break;
1637  case kSMALLINT:
1638  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1639  column_builder, values, is_valid);
1640  break;
1641  case kINT:
1642  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1643  column_builder, values, is_valid);
1644  break;
1645  case kBIGINT:
1646  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1647  column_builder, values, is_valid);
1648  break;
1649  case kDECIMAL:
1650  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1651  column_builder, values, is_valid);
1652  break;
1653  case kFLOAT:
1654  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1655  break;
1656  case kDOUBLE:
1657  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1658  column_builder, values, is_valid);
1659  break;
1660  case kTIME:
1661  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1662  column_builder, values, is_valid);
1663  break;
1664  case kTIMESTAMP:
1665  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1666  column_builder, values, is_valid);
1667  break;
1668  case kDATE:
1670  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1671  column_builder, values, is_valid)
1672  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1673  column_builder, values, is_valid);
1674  break;
1675  case kARRAY:
1676  if (column_builder.col_type.get_subtype() == kBOOLEAN) {
1677  appendToListColumnBuilder<arrow::BooleanBuilder, int8_t>(
1678  column_builder, values, is_valid);
1679  break;
1680  } else if (column_builder.col_type.get_subtype() == kTINYINT) {
1681  appendToListColumnBuilder<arrow::Int8Builder, int8_t>(
1682  column_builder, values, is_valid);
1683  break;
1684  } else if (column_builder.col_type.get_subtype() == kSMALLINT) {
1685  appendToListColumnBuilder<arrow::Int16Builder, int16_t>(
1686  column_builder, values, is_valid);
1687  break;
1688  } else if (column_builder.col_type.get_subtype() == kINT) {
1689  appendToListColumnBuilder<arrow::Int32Builder, int32_t>(
1690  column_builder, values, is_valid);
1691  break;
1692  } else if (column_builder.col_type.get_subtype() == kBIGINT) {
1693  appendToListColumnBuilder<arrow::Int64Builder, int64_t>(
1694  column_builder, values, is_valid);
1695  break;
1696  } else if (column_builder.col_type.get_subtype() == kFLOAT) {
1697  appendToListColumnBuilder<arrow::FloatBuilder, float>(
1698  column_builder, values, is_valid);
1699  break;
1700  } else if (column_builder.col_type.get_subtype() == kDOUBLE) {
1701  appendToListColumnBuilder<arrow::DoubleBuilder, double>(
1702  column_builder, values, is_valid);
1703  break;
1704  } else if (column_builder.col_type.is_dict_encoded_type()) {
1705  appendToListColumnBuilder<arrow::StringDictionaryBuilder, int64_t>(
1706  column_builder, values, is_valid);
1707  break;
1708  } else {
1709  throw std::runtime_error(column_builder.col_type.get_type_name() +
1710  " is not supported in Arrow result sets.");
1711  }
1712  case kCHAR:
1713  case kVARCHAR:
1714  case kTEXT:
1715  appendToColumnBuilder<arrow::StringBuilder, std::string>(
1716  column_builder, values, is_valid);
1717  break;
1718  default:
1719  // TODO(miyu): support more scalar types.
1720  throw std::runtime_error(column_builder.col_type.get_type_name() +
1721  " is not supported in Arrow result sets.");
1722  }
1723 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:380
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< std::vector< T >> Vec2
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:389
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
std::shared_ptr< arrow::StringArray > string_array
Definition: sqltypes.h:64
SQLTypes
Definition: sqltypes.h:53
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void appendToListColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
bool is_fp() const
Definition: sqltypes.h:579
HOST DEVICE int get_scale() const
Definition: sqltypes.h:384
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GE(x, y)
Definition: Logger.h:235
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::string >, std::vector< std::vector< int8_t >>, std::vector< std::vector< int16_t >>, std::vector< std::vector< int32_t >>, std::vector< std::vector< int64_t >>, std::vector< std::vector< float >>, std::vector< std::vector< double >>, std::vector< std::vector< std::string >>> ValueArray
std::shared_ptr< arrow::Field > field
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:379
#define CHECK_GT(x, y)
Definition: Logger.h:234
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:581
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
bool is_integer() const
Definition: sqltypes.h:577
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
ExecutorDeviceType device_type_
boost::optional< std::vector< ScalarTargetValue >> ArrayTargetValue
Definition: TargetValue.h:181
std::vector< char > df_handle
void remap_string_values(const ArrowResultSetConverter::ColumnBuilder &column_builder, const std::vector< uint8_t > &bitmap, std::vector< int64_t > &vec1d)
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
static int32_t transientIndexToId(unsigned const index)
bool is_dict_encoded_type() const
Definition: sqltypes.h:639
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:296
bool is_boolean() const
Definition: sqltypes.h:582
int get_precision() const
Definition: sqltypes.h:382
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:67
Definition: sqltypes.h:68
std::unordered_map< StrId, ArrowStrId > string_remapping
std::shared_ptr< ResultSet > results_
int64_t sm_size
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:503
#define IS_INTEGER(T)
Definition: sqltypes.h:292
Definition: sqltypes.h:56
void create_or_append_validity(const ArrayTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:388
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
bool is_dict_encoded_string() const
Definition: sqltypes.h:627
Definition: sqltypes.h:60
string name
Definition: setup.in.py:72
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:386
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:578
bool is_array() const
Definition: sqltypes.h:583
#define VLOG(n)
Definition: Logger.h:316
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
std::shared_ptr< arrow::RecordBatch > convertToArrow() const