OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
22 
23 #ifndef _MSC_VER
24 #include <sys/ipc.h>
25 #include <sys/shm.h>
26 #include <sys/types.h>
27 #else
28 // IPC shared memory not yet supported on windows
29 using key_t = size_t;
30 #define IPC_PRIVATE 0
31 #endif
32 
33 #include <algorithm>
34 #include <cerrno>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <future>
38 #include <string>
39 
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
43 
44 #include "Shared/ArrowUtil.h"
45 
46 #ifdef HAVE_CUDA
47 #include <arrow/gpu/cuda_api.h>
48 #include <cuda.h>
49 #endif // HAVE_CUDA
50 
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
52 
53 #define ARROW_CONVERTER_DEBUG true
54 
55 #define ARROW_LOG(category) \
56  VLOG(1) << "[Arrow]" \
57  << "[" << category "] "
58 
59 namespace {
60 
61 /* We can create Arrow buffers which refer memory owned by ResultSet.
62  For safe memory access we should keep a ResultSetPtr to keep
63  data live while buffer lives. Use this custom buffer for that. */
64 class ResultSetBuffer : public arrow::Buffer {
65  public:
66  ResultSetBuffer(const uint8_t* buf, size_t size, ResultSetPtr rs)
67  : arrow::Buffer(buf, size), _rs(rs) {}
68 
69  private:
71 };
72 
75  switch (ti.get_size()) {
76  case 1:
77  return kTINYINT;
78  case 2:
79  return kSMALLINT;
80  case 4:
81  return kINT;
82  case 8:
83  return kBIGINT;
84  default:
85  CHECK(false);
86  }
87  return ti.get_type();
88 }
89 
91  auto logical_type = ti.get_type();
92  if (IS_INTEGER(logical_type)) {
93  switch (ti.get_size()) {
94  case 1:
95  return kTINYINT;
96  case 2:
97  return kSMALLINT;
98  case 4:
99  return kINT;
100  case 8:
101  return kBIGINT;
102  default:
103  CHECK(false);
104  }
105  }
106  return logical_type;
107 }
108 
109 template <typename TYPE, typename VALUE_ARRAY_TYPE>
111  std::shared_ptr<ValueArray>& values,
112  const size_t max_size) {
113  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
114  CHECK(pval_cty);
115  auto val_ty = static_cast<TYPE>(*pval_cty);
116  if (!values) {
117  values = std::make_shared<ValueArray>(std::vector<TYPE>());
118  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
119  }
120  CHECK(values);
121  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
122  CHECK(values_ty);
123  values_ty->push_back(val_ty);
124 }
125 
126 template <typename TYPE>
128  const SQLTypeInfo& col_type,
129  std::shared_ptr<std::vector<bool>>& null_bitmap,
130  const size_t max_size) {
131  if (col_type.get_notnull()) {
132  CHECK(!null_bitmap);
133  return;
134  }
135  auto pvalue = boost::get<TYPE>(&value);
136  CHECK(pvalue);
137  bool is_valid = false;
138  if (col_type.is_boolean()) {
139  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
140  } else if (col_type.is_dict_encoded_string()) {
141  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
142  } else if (col_type.is_integer() || col_type.is_time()) {
143  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
144  } else if (col_type.is_fp()) {
145  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
146  } else if (col_type.is_decimal()) {
147  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
148  } else {
149  UNREACHABLE();
150  }
151 
152  if (!null_bitmap) {
153  null_bitmap = std::make_shared<std::vector<bool>>();
154  null_bitmap->reserve(max_size);
155  }
156  CHECK(null_bitmap);
157  null_bitmap->push_back(is_valid);
158 }
159 
160 template <typename TYPE, typename enable = void>
161 class null_type {};
162 
163 template <typename TYPE>
164 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
166  static constexpr type value = inline_int_null_value<type>();
167 };
168 
169 template <typename TYPE>
170 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
171  using type = TYPE;
172  static constexpr type value = inline_fp_null_value<type>();
173 };
174 
175 template <typename TYPE>
177 
178 template <typename C_TYPE,
179  typename ARROW_TYPE = typename arrow::CTypeTraits<C_TYPE>::ArrowType>
181  size_t col,
182  size_t entry_count,
183  std::shared_ptr<arrow::Array>& out) {
184  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
185 
186  std::shared_ptr<arrow::Buffer> values;
187  std::shared_ptr<arrow::Buffer> is_valid;
188  const int64_t buf_size = entry_count * sizeof(C_TYPE);
189  if (result->isZeroCopyColumnarConversionPossible(col)) {
190  values.reset(new ResultSetBuffer(
191  reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
192  buf_size,
193  result));
194  } else {
195  auto res = arrow::AllocateBuffer(buf_size);
196  CHECK(res.ok());
197  values = std::move(res).ValueOrDie();
198  result->copyColumnIntoBuffer(
199  col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
200  }
201 
202  int64_t null_count = 0;
203  auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
204  CHECK(res.ok());
205  is_valid = std::move(res).ValueOrDie();
206 
207  auto is_valid_data = is_valid->mutable_data();
208 
209  const null_type_t<C_TYPE>* vals =
210  reinterpret_cast<const null_type_t<C_TYPE>*>(values->data());
212 
213  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
214  for (size_t i = 0; i < unroll_count; i += 8) {
215  uint8_t valid_byte = 0;
216  uint8_t valid;
217  valid = vals[i + 0] != null_val;
218  valid_byte |= valid << 0;
219  null_count += !valid;
220  valid = vals[i + 1] != null_val;
221  valid_byte |= valid << 1;
222  null_count += !valid;
223  valid = vals[i + 2] != null_val;
224  valid_byte |= valid << 2;
225  null_count += !valid;
226  valid = vals[i + 3] != null_val;
227  valid_byte |= valid << 3;
228  null_count += !valid;
229  valid = vals[i + 4] != null_val;
230  valid_byte |= valid << 4;
231  null_count += !valid;
232  valid = vals[i + 5] != null_val;
233  valid_byte |= valid << 5;
234  null_count += !valid;
235  valid = vals[i + 6] != null_val;
236  valid_byte |= valid << 6;
237  null_count += !valid;
238  valid = vals[i + 7] != null_val;
239  valid_byte |= valid << 7;
240  null_count += !valid;
241  is_valid_data[i >> 3] = valid_byte;
242  }
243  if (unroll_count != entry_count) {
244  uint8_t valid_byte = 0;
245  for (size_t i = unroll_count; i < entry_count; ++i) {
246  bool valid = vals[i] != null_val;
247  valid_byte |= valid << (i & 7);
248  null_count += !valid;
249  }
250  is_valid_data[unroll_count >> 3] = valid_byte;
251  }
252 
253  if (!null_count) {
254  is_valid.reset();
255  }
256 
257  // TODO: support date/time + scaling
258  // TODO: support booleans
259  if (null_count) {
260  out.reset(
261  new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
262  } else {
263  out.reset(new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
264  }
265 }
266 
267 #ifndef _MSC_VER
268 std::pair<key_t, void*> get_shm(size_t shmsz) {
269  if (!shmsz) {
270  return std::make_pair(IPC_PRIVATE, nullptr);
271  }
272  // Generate a new key for a shared memory segment. Keys to shared memory segments
273  // are OS global, so we need to try a new key if we encounter a collision. It seems
274  // incremental keygen would be deterministically worst-case. If we use a hash
275  // (like djb2) + nonce, we could still get collisions if multiple clients specify
276  // the same nonce, so using rand() in lieu of a better approach
277  // TODO(ptaylor): Is this common? Are these assumptions true?
278  auto key = static_cast<key_t>(rand());
279  int shmid = -1;
280  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
281  // exist IPC_EXCL - ensures failure if a segment already exists for this key
282  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
283  // If shmget fails and errno is one of these four values, try a new key.
284  // TODO(ptaylor): is checking for the last three values really necessary? Checking
285  // them by default to be safe. EEXIST - a shared memory segment is already associated
286  // with this key EACCES - a shared memory segment is already associated with this key,
287  // but we don't have permission to access it EINVAL - a shared memory segment is
288  // already associated with this key, but the size is less than shmsz ENOENT -
289  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
290  // was found
291  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
292  throw std::runtime_error("failed to create a shared memory");
293  }
294  key = static_cast<key_t>(rand());
295  }
296  // get a pointer to the shared memory segment
297  auto ipc_ptr = shmat(shmid, NULL, 0);
298  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
299  throw std::runtime_error("failed to attach a shared memory");
300  }
301 
302  return std::make_pair(key, ipc_ptr);
303 }
304 #endif
305 
306 std::pair<key_t, std::shared_ptr<arrow::Buffer>> get_shm_buffer(size_t size) {
307 #ifdef _MSC_VER
308  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
309  return std::make_pair(0, nullptr);
310 #else
311  auto [key, ipc_ptr] = get_shm(size);
312  std::shared_ptr<arrow::Buffer> buffer(
313  new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
314  return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
315  std::move(buffer));
316 #endif
317 }
318 
319 } // namespace
320 
321 namespace arrow {
322 
323 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
324 #ifdef _MSC_VER
325  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
326 #else
327  auto [key, ipc_ptr] = get_shm(data->size());
328  // copy the arrow records buffer to shared memory
329  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
330  // write directly to the shared memory segment as a sink
331  memcpy(ipc_ptr, data->data(), data->size());
332  // detach from the shared memory segment
333  shmdt(ipc_ptr);
334  return key;
335 #endif
336 }
337 
338 } // namespace arrow
339 
344  auto timer = DEBUG_TIMER(__func__);
345  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
346 
347  struct BuildResultParams {
348  int64_t schemaSize() const {
349  return serialized_schema ? serialized_schema->size() : 0;
350  };
351  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
352  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
353  bool hasRecordBatch() const { return records_size > 0; }
354  bool hasDict() const { return dictSize() > 0; }
355 
356  int64_t records_size{0};
357  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
358  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
359  } result_params;
360 
363  const auto getWireResult = [&]() -> ArrowResult {
364  auto timer = DEBUG_TIMER("serialize batch to wire");
365  const auto total_size = result_params.totalSize();
366  std::vector<char> record_handle_data(total_size);
367  auto serialized_records =
368  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
369 
370  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
371 
372  ARROW_THROW_NOT_OK(writer->Write(
373  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
374  result_params.schemaSize()));
375 
376  if (result_params.hasDict()) {
377  ARROW_THROW_NOT_OK(writer->Write(
378  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
379  result_params.dictSize()));
380  }
381 
382  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
383  serialized_records, result_params.schemaSize() + result_params.dictSize()));
384 
385  if (result_params.hasRecordBatch()) {
386  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
387  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
388  }
389 
390  return {std::vector<char>(0),
391  0,
392  std::vector<char>(0),
393  serialized_records->size(),
394  std::string{""},
395  std::move(record_handle_data)};
396  };
397 
398  const auto getShmResult = [&]() -> ArrowResult {
399  auto timer = DEBUG_TIMER("serialize batch to shared memory");
400  std::shared_ptr<arrow::Buffer> serialized_records;
401  std::vector<char> schema_handle_buffer;
402  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
403  key_t records_shm_key = IPC_PRIVATE;
404  const int64_t total_size = result_params.totalSize();
405 
406  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
407 
408  memcpy(serialized_records->mutable_data(),
409  result_params.serialized_schema->data(),
410  (size_t)result_params.schemaSize());
411 
412  if (result_params.hasDict()) {
413  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
414  result_params.serialized_dict->data(),
415  (size_t)result_params.dictSize());
416  }
417 
418  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
419  serialized_records, result_params.schemaSize() + result_params.dictSize()));
420 
421  if (result_params.hasRecordBatch()) {
422  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
423  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
424  }
425 
426  memcpy(&record_handle_buffer[0],
427  reinterpret_cast<const unsigned char*>(&records_shm_key),
428  sizeof(key_t));
429 
430  return {schema_handle_buffer,
431  0,
432  record_handle_buffer,
433  serialized_records->size(),
434  std::string{""}};
435  };
436 
437  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
438  auto options = arrow::ipc::IpcWriteOptions::Defaults();
439  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
440 
441  // If our record batch is going to be empty, we omit it entirely,
442  // only serializing the schema.
443  if (!record_batch->num_rows()) {
444  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
445  arrow::ipc::SerializeSchema(*record_batch->schema(),
446  arrow::default_memory_pool()));
447 
448  switch (transport_method_) {
450  return getWireResult();
452  return getShmResult();
453  default:
454  UNREACHABLE();
455  }
456  }
457 
458  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
459 
460  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
461 
462  for (auto& pair : dictionaries) {
463  arrow::ipc::IpcPayload payload;
464  int64_t dictionary_id = pair.first;
465  const auto& dictionary = pair.second;
466 
468  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
469  int32_t metadata_length = 0;
471  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
472  }
473  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
474 
475  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
476  arrow::ipc::SerializeSchema(*record_batch->schema(),
477  arrow::default_memory_pool()));
478 
480  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
481 
482  switch (transport_method_) {
484  return getWireResult();
486  return getShmResult();
487  default:
488  UNREACHABLE();
489  }
490  }
491 #ifdef HAVE_CUDA
493 
494  // Copy the schema to the schema handle
495  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
496  ARROW_THROW_NOT_OK(out_stream_result.status());
497  auto out_stream = std::move(out_stream_result).ValueOrDie();
498 
499  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
500  arrow::ipc::DictionaryMemo current_memo;
501  arrow::ipc::DictionaryMemo serialized_memo;
502 
503  arrow::ipc::IpcPayload schema_payload;
504  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
505  arrow::ipc::IpcWriteOptions::Defaults(),
506  mapper,
507  &schema_payload));
508  int32_t schema_payload_length = 0;
509  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
510  arrow::ipc::IpcWriteOptions::Defaults(),
511  out_stream.get(),
512  &schema_payload_length));
513  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
514  ARROW_LOG("GPU") << "Dictionary "
515  << "found dicts: " << dictionaries.size();
516 
518  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
519 
520  // now try a dictionary
521  std::shared_ptr<arrow::Schema> dummy_schema;
522  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
523 
524  for (const auto& pair : dictionaries) {
525  arrow::ipc::IpcPayload payload;
526  const auto& dict_id = pair.first;
527  CHECK_GE(dict_id, 0);
528  ARROW_LOG("GPU") << "Dictionary "
529  << "dict_id: " << dict_id;
530  const auto& dict = pair.second;
531  CHECK(dict);
532 
533  if (!dummy_schema) {
534  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
535  dummy_schema = std::make_shared<arrow::Schema>(
536  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
537  }
538  dict_batches.emplace_back(
539  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
540  }
541 
542  if (!dict_batches.empty()) {
543  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
544  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
545  }
546 
547  auto complete_ipc_stream = out_stream->Finish();
548  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
549  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
550 
551  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
552  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
553  memcpy(&schema_record_key_buffer[0],
554  reinterpret_cast<const unsigned char*>(&record_key),
555  sizeof(key_t));
556 
557  arrow::cuda::CudaDeviceManager* manager;
558  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
559  std::shared_ptr<arrow::cuda::CudaContext> context;
560  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
561 
562  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
563  ARROW_ASSIGN_OR_THROW(device_serialized,
564  SerializeRecordBatch(*record_batch, context.get()));
565 
566  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
567  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
568 
569  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
570  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
571  cuda_handle->Serialize(arrow::default_memory_pool()));
572 
573  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
574  memcpy(&record_handle_buffer[0],
575  serialized_cuda_handle->data(),
576  serialized_cuda_handle->size());
577 
578  return {schema_record_key_buffer,
579  serialized_records->size(),
580  record_handle_buffer,
581  serialized_cuda_handle->size(),
582  serialized_cuda_handle->ToString()};
583 #else
584  UNREACHABLE();
585  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
586 #endif
587 }
588 
591  arrow::ipc::DictionaryFieldMapper* mapper) const {
592  auto timer = DEBUG_TIMER(__func__);
593  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
594  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
595 
597  serialized_schema,
598  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
599 
600  if (arrow_copy->num_rows()) {
601  auto timer = DEBUG_TIMER("serialize records");
602  ARROW_THROW_NOT_OK(arrow_copy->Validate());
603  ARROW_ASSIGN_OR_THROW(serialized_records,
604  arrow::ipc::SerializeRecordBatch(
605  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
606  } else {
607  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
608  }
609  return {serialized_schema, serialized_records};
610 }
611 
612 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
613  auto timer = DEBUG_TIMER(__func__);
614  const auto col_count = results_->colCount();
615  std::vector<std::shared_ptr<arrow::Field>> fields;
616  CHECK(col_names_.empty() || col_names_.size() == col_count);
617  for (size_t i = 0; i < col_count; ++i) {
618  const auto ti = results_->getColType(i);
619  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
620  }
621 #if ARROW_CONVERTER_DEBUG
622  VLOG(1) << "Arrow fields: ";
623  for (const auto& f : fields) {
624  VLOG(1) << "\t" << f->ToString(true);
625  }
626 #endif
627  return getArrowBatch(arrow::schema(fields));
628 }
629 
630 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
631  const std::shared_ptr<arrow::Schema>& schema) const {
632  std::vector<std::shared_ptr<arrow::Array>> result_columns;
633 
634  // First, check if the result set is empty.
635  // If so, we return an arrow result set that only
636  // contains the schema (no record batch will be serialized).
637  if (results_->isEmpty()) {
638  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
639  }
640 
641  const size_t entry_count = top_n_ < 0
642  ? results_->entryCount()
643  : std::min(size_t(top_n_), results_->entryCount());
644 
645  const auto col_count = results_->colCount();
646  size_t row_count = 0;
647 
648  result_columns.resize(col_count);
649  std::vector<ColumnBuilder> builders(col_count);
650 
651  // Create array builders
652  for (size_t i = 0; i < col_count; ++i) {
653  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
654  }
655 
656  // TODO(miyu): speed up for columnar buffers
657  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
658  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
659  const std::vector<bool>& non_lazy_cols,
660  const size_t start_entry,
661  const size_t end_entry) -> size_t {
662  CHECK_EQ(value_seg.size(), col_count);
663  CHECK_EQ(null_bitmap_seg.size(), col_count);
664  const auto entry_count = end_entry - start_entry;
665  size_t seg_row_count = 0;
666  for (size_t i = start_entry; i < end_entry; ++i) {
667  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
668  if (row.empty()) {
669  continue;
670  }
671  ++seg_row_count;
672  for (size_t j = 0; j < col_count; ++j) {
673  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
674  continue;
675  }
676 
677  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
678  // TODO(miyu): support more types other than scalar.
679  CHECK(scalar_value);
680  const auto& column = builders[j];
681  switch (column.physical_type) {
682  case kBOOLEAN:
683  create_or_append_value<bool, int64_t>(
684  *scalar_value, value_seg[j], entry_count);
685  create_or_append_validity<int64_t>(
686  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
687  break;
688  case kTINYINT:
689  create_or_append_value<int8_t, int64_t>(
690  *scalar_value, value_seg[j], entry_count);
691  create_or_append_validity<int64_t>(
692  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
693  break;
694  case kSMALLINT:
695  create_or_append_value<int16_t, int64_t>(
696  *scalar_value, value_seg[j], entry_count);
697  create_or_append_validity<int64_t>(
698  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
699  break;
700  case kINT:
701  create_or_append_value<int32_t, int64_t>(
702  *scalar_value, value_seg[j], entry_count);
703  create_or_append_validity<int64_t>(
704  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
705  break;
706  case kBIGINT:
707  create_or_append_value<int64_t, int64_t>(
708  *scalar_value, value_seg[j], entry_count);
709  create_or_append_validity<int64_t>(
710  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
711  break;
712  case kDECIMAL:
713  create_or_append_value<int64_t, int64_t>(
714  *scalar_value, value_seg[j], entry_count);
715  create_or_append_validity<int64_t>(
716  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
717  break;
718  case kFLOAT:
719  create_or_append_value<float, float>(
720  *scalar_value, value_seg[j], entry_count);
721  create_or_append_validity<float>(
722  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
723  break;
724  case kDOUBLE:
725  create_or_append_value<double, double>(
726  *scalar_value, value_seg[j], entry_count);
727  create_or_append_validity<double>(
728  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
729  break;
730  case kTIME:
731  create_or_append_value<int32_t, int64_t>(
732  *scalar_value, value_seg[j], entry_count);
733  create_or_append_validity<int64_t>(
734  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
735  break;
736  case kDATE:
738  ? create_or_append_value<int64_t, int64_t>(
739  *scalar_value, value_seg[j], entry_count)
740  : create_or_append_value<int32_t, int64_t>(
741  *scalar_value, value_seg[j], entry_count);
742  create_or_append_validity<int64_t>(
743  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
744  break;
745  case kTIMESTAMP:
746  create_or_append_value<int64_t, int64_t>(
747  *scalar_value, value_seg[j], entry_count);
748  create_or_append_validity<int64_t>(
749  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
750  break;
751  default:
752  // TODO(miyu): support more scalar types.
753  throw std::runtime_error(column.col_type.get_type_name() +
754  " is not supported in Arrow result sets.");
755  }
756  }
757  }
758  return seg_row_count;
759  };
760 
761  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
762  const std::vector<bool>& non_lazy_cols,
763  const size_t start_col,
764  const size_t end_col) {
765  for (size_t col = start_col; col < end_col; ++col) {
766  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
767  continue;
768  }
769 
770  const auto& column = builders[col];
771  switch (column.physical_type) {
772  case kTINYINT:
773  convert_column<int8_t>(results_, col, entry_count, result[col]);
774  break;
775  case kSMALLINT:
776  convert_column<int16_t>(results_, col, entry_count, result[col]);
777  break;
778  case kINT:
779  convert_column<int32_t>(results_, col, entry_count, result[col]);
780  break;
781  case kBIGINT:
782  convert_column<int64_t>(results_, col, entry_count, result[col]);
783  break;
784  case kFLOAT:
785  convert_column<float>(results_, col, entry_count, result[col]);
786  break;
787  case kDOUBLE:
788  convert_column<double>(results_, col, entry_count, result[col]);
789  break;
790  default:
791  throw std::runtime_error(column.col_type.get_type_name() +
792  " is not supported in Arrow column converter.");
793  }
794  }
795  };
796 
797  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
798  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
799  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
800  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
801  results_->getQueryMemDesc().getQueryDescriptionType() ==
803  entry_count == results_->entryCount();
804  std::vector<bool> non_lazy_cols;
805  if (use_columnar_converter) {
806  auto timer = DEBUG_TIMER("columnar converter");
807  std::vector<size_t> non_lazy_col_pos;
808  size_t non_lazy_col_count = 0;
809  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
810 
811  non_lazy_cols.reserve(col_count);
812  non_lazy_col_pos.reserve(col_count);
813  for (size_t i = 0; i < col_count; ++i) {
814  bool is_lazy =
815  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
816  // Currently column converter cannot handle some data types.
817  // Treat them as lazy.
818  switch (builders[i].physical_type) {
819  case kBOOLEAN:
820  case kTIME:
821  case kDATE:
822  case kTIMESTAMP:
823  is_lazy = true;
824  break;
825  default:
826  break;
827  }
828  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
829  is_lazy = true;
830  }
831  non_lazy_cols.emplace_back(!is_lazy);
832  if (!is_lazy) {
833  ++non_lazy_col_count;
834  non_lazy_col_pos.emplace_back(i);
835  }
836  }
837 
838  if (non_lazy_col_count == col_count) {
839  non_lazy_cols.clear();
840  non_lazy_col_pos.clear();
841  } else {
842  non_lazy_col_pos.emplace_back(col_count);
843  }
844 
845  std::vector<std::future<void>> child_threads;
846  size_t num_threads =
847  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
848 
849  size_t start_col = 0;
850  size_t end_col = 0;
851  for (size_t i = 0; i < num_threads; ++i) {
852  start_col = end_col;
853  end_col = (i + 1) * non_lazy_col_count / num_threads;
854  size_t phys_start_col =
855  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
856  size_t phys_end_col =
857  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
858  child_threads.push_back(std::async(std::launch::async,
859  convert_columns,
860  std::ref(result_columns),
861  non_lazy_cols,
862  phys_start_col,
863  phys_end_col));
864  }
865  for (auto& child : child_threads) {
866  child.get();
867  }
868  row_count = entry_count;
869  }
870  if (!use_columnar_converter || !non_lazy_cols.empty()) {
871  auto timer = DEBUG_TIMER("row converter");
872  row_count = 0;
873  if (multithreaded) {
874  const size_t cpu_count = cpu_threads();
875  std::vector<std::future<size_t>> child_threads;
876  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
877  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
878  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
879  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
880  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
881  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
882  ++i, start_entry += stride) {
883  const auto end_entry = std::min(entry_count, start_entry + stride);
884  child_threads.push_back(std::async(std::launch::async,
885  fetch,
886  std::ref(column_value_segs[i]),
887  std::ref(null_bitmap_segs[i]),
888  non_lazy_cols,
889  start_entry,
890  end_entry));
891  }
892  for (auto& child : child_threads) {
893  row_count += child.get();
894  }
895  {
896  auto timer = DEBUG_TIMER("append rows to arrow");
897  for (int i = 0; i < schema->num_fields(); ++i) {
898  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
899  continue;
900  }
901 
902  for (size_t j = 0; j < cpu_count; ++j) {
903  if (!column_value_segs[j][i]) {
904  continue;
905  }
906  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
907  }
908  }
909  }
910  } else {
911  row_count =
912  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
913  {
914  auto timer = DEBUG_TIMER("append rows to arrow single thread");
915  for (int i = 0; i < schema->num_fields(); ++i) {
916  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
917  continue;
918  }
919 
920  append(builders[i], *column_values[i], null_bitmaps[i]);
921  }
922  }
923  }
924 
925  {
926  auto timer = DEBUG_TIMER("finish builders");
927  for (size_t i = 0; i < col_count; ++i) {
928  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
929  continue;
930  }
931 
932  result_columns[i] = finishColumnBuilder(builders[i]);
933  }
934  }
935  }
936 
937  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
938 }
939 
940 namespace {
941 
942 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
943  const ExecutorDeviceType device_type) {
944  switch (get_physical_type(sql_type)) {
945  case kBOOLEAN:
946  return arrow::boolean();
947  case kTINYINT:
948  return arrow::int8();
949  case kSMALLINT:
950  return arrow::int16();
951  case kINT:
952  return arrow::int32();
953  case kBIGINT:
954  return arrow::int64();
955  case kFLOAT:
956  return arrow::float32();
957  case kDOUBLE:
958  return arrow::float64();
959  case kCHAR:
960  case kVARCHAR:
961  case kTEXT:
962  if (sql_type.is_dict_encoded_string()) {
963  auto value_type = std::make_shared<arrow::StringType>();
964  return dictionary(arrow::int32(), value_type, false);
965  }
966  return arrow::utf8();
967  case kDECIMAL:
968  case kNUMERIC:
969  return arrow::decimal(sql_type.get_precision(), sql_type.get_scale());
970  case kTIME:
971  return time32(arrow::TimeUnit::SECOND);
972  case kDATE: {
973  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
974  // Currently support for date32() is missing in cuDF.Hence, if client requests for
975  // date on GPU, return date64() for the time being, till support is added.
976  if (device_type == ExecutorDeviceType::GPU) {
977  return arrow::date64();
978  } else {
979  return arrow::date32();
980  }
981  }
982  case kTIMESTAMP:
983  switch (sql_type.get_precision()) {
984  case 0:
985  return timestamp(arrow::TimeUnit::SECOND);
986  case 3:
987  return timestamp(arrow::TimeUnit::MILLI);
988  case 6:
989  return timestamp(arrow::TimeUnit::MICRO);
990  case 9:
991  return timestamp(arrow::TimeUnit::NANO);
992  default:
993  throw std::runtime_error(
994  "Unsupported timestamp precision for Arrow result sets: " +
995  std::to_string(sql_type.get_precision()));
996  }
997  case kARRAY:
998  case kINTERVAL_DAY_TIME:
1000  default:
1001  throw std::runtime_error(sql_type.get_type_name() +
1002  " is not supported in Arrow result sets.");
1003  }
1004  return nullptr;
1005 }
1006 
1007 } // namespace
1008 
1009 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
1010  const std::string name,
1011  const SQLTypeInfo& target_type) const {
1012  return arrow::field(
1013  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1014 }
1015 
1017  const ArrowResult& result,
1018  const ExecutorDeviceType device_type,
1019  const size_t device_id,
1020  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1021 #ifndef _MSC_VER
1022  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
1023  // Remove shared memory on sysmem
1024  if (!result.sm_handle.empty()) {
1025  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
1026  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
1027  auto shm_id = shmget(schema_key, result.sm_size, 0666);
1028  if (shm_id < 0) {
1029  throw std::runtime_error(
1030  "failed to get an valid shm ID w/ given shm key of the schema");
1031  }
1032  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1033  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
1034  std::to_string(errno) + ")");
1035  }
1036  }
1037 
1038  if (device_type == ExecutorDeviceType::CPU) {
1039  CHECK_EQ(sizeof(key_t), result.df_handle.size());
1040  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
1041  auto shm_id = shmget(df_key, result.df_size, 0666);
1042  if (shm_id < 0) {
1043  throw std::runtime_error(
1044  "failed to get an valid shm ID w/ given shm key of the data");
1045  }
1046  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1047  throw std::runtime_error("failed to deallocate Arrow data frame");
1048  }
1049  }
1050  // CUDA buffers become owned by the caller, and will automatically be freed
1051  // TODO: What if the client never takes ownership of the result? we may want to
1052  // establish a check to see if the GPU buffer still exists, and then free it.
1053 #endif
1054 }
1055 
1057  ColumnBuilder& column_builder,
1058  const SQLTypeInfo& col_type,
1059  const std::shared_ptr<arrow::Field>& field) const {
1060  column_builder.field = field;
1061  column_builder.col_type = col_type;
1062  column_builder.physical_type = col_type.is_dict_encoded_string()
1063  ? get_dict_index_type(col_type)
1064  : get_physical_type(col_type);
1065 
1066  auto value_type = field->type();
1067  if (col_type.is_dict_encoded_string()) {
1068  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1069  // add values to the builder
1070  const int dict_id = col_type.get_comp_param();
1071  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1072 
1073  arrow::StringBuilder str_array_builder;
1074  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1075  std::shared_ptr<arrow::StringArray> string_array;
1076  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1077 
1078  auto dict_builder =
1079  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1080  CHECK(dict_builder);
1081 
1082  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1083  } else {
1084  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1085  arrow::default_memory_pool(), value_type, &column_builder.builder));
1086  }
1087 }
1088 
1089 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1090  ColumnBuilder& column_builder) const {
1091  std::shared_ptr<arrow::Array> values;
1092  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1093  return values;
1094 }
1095 
1096 namespace {
1097 
1098 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1100  const ValueArray& values,
1101  const std::shared_ptr<std::vector<bool>>& is_valid) {
1102  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1103  "Dictionary encoded string builder requires function specialization.");
1104 
1105  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1106 
1107  if (scale_epoch_values<BUILDER_TYPE>()) {
1108  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1109  auto scale_values = [&](auto epoch) {
1110  return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1112  : scale_sec_to_millisec(epoch);
1113  };
1114  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1115  }
1116 
1117  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1118  CHECK(typed_builder);
1119  if (column_builder.field->nullable()) {
1120  CHECK(is_valid.get());
1121  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1122  } else {
1123  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1124  }
1125 }
1126 
1127 template <>
1128 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1130  const ValueArray& values,
1131  const std::shared_ptr<std::vector<bool>>& is_valid) {
1132  std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1133  auto typed_builder =
1134  dynamic_cast<arrow::Decimal128Builder*>(column_builder.builder.get());
1135  CHECK(typed_builder);
1136  CHECK_EQ(is_valid->size(), vals.size());
1137  if (column_builder.field->nullable()) {
1138  CHECK(is_valid.get());
1139  for (size_t i = 0; i < vals.size(); i++) {
1140  const auto v = vals[i];
1141  const auto valid = (*is_valid)[i];
1142  if (valid) {
1143  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1144  } else {
1145  ARROW_THROW_NOT_OK(typed_builder->AppendNull());
1146  }
1147  }
1148  } else {
1149  for (const auto& v : vals) {
1150  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1151  }
1152  }
1153 }
1154 
1155 template <>
1156 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1158  const ValueArray& values,
1159  const std::shared_ptr<std::vector<bool>>& is_valid) {
1160  auto typed_builder =
1161  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1162  CHECK(typed_builder);
1163 
1164  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1165 
1166  if (column_builder.field->nullable()) {
1167  CHECK(is_valid.get());
1168  // TODO(adb): Generate this instead of the boolean bitmap
1169  std::vector<uint8_t> transformed_bitmap;
1170  transformed_bitmap.reserve(is_valid->size());
1171  std::for_each(
1172  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1173  transformed_bitmap.push_back(is_valid ? 1 : 0);
1174  });
1175 
1176  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1177  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1178  } else {
1180  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1181  }
1182 }
1183 
1184 } // namespace
1185 
1187  ColumnBuilder& column_builder,
1188  const ValueArray& values,
1189  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1190  if (column_builder.col_type.is_dict_encoded_string()) {
1191  CHECK_EQ(column_builder.physical_type,
1192  kINT); // assume all dicts use none-encoded type for now
1193  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1194  column_builder, values, is_valid);
1195  return;
1196  }
1197  switch (column_builder.physical_type) {
1198  case kBOOLEAN:
1199  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1200  column_builder, values, is_valid);
1201  break;
1202  case kTINYINT:
1203  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1204  break;
1205  case kSMALLINT:
1206  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1207  column_builder, values, is_valid);
1208  break;
1209  case kINT:
1210  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1211  column_builder, values, is_valid);
1212  break;
1213  case kBIGINT:
1214  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1215  column_builder, values, is_valid);
1216  break;
1217  case kDECIMAL:
1218  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1219  column_builder, values, is_valid);
1220  break;
1221  case kFLOAT:
1222  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1223  break;
1224  case kDOUBLE:
1225  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1226  column_builder, values, is_valid);
1227  break;
1228  case kTIME:
1229  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1230  column_builder, values, is_valid);
1231  break;
1232  case kTIMESTAMP:
1233  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1234  column_builder, values, is_valid);
1235  break;
1236  case kDATE:
1238  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1239  column_builder, values, is_valid)
1240  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1241  column_builder, values, is_valid);
1242  break;
1243  case kCHAR:
1244  case kVARCHAR:
1245  case kTEXT:
1246  default:
1247  // TODO(miyu): support more scalar types.
1248  throw std::runtime_error(column_builder.col_type.get_type_name() +
1249  " is not supported in Arrow result sets.");
1250  }
1251 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:49
SQLTypes
Definition: sqltypes.h:38
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
bool is_fp() const
Definition: sqltypes.h:508
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
string name
Definition: setup.in.py:72
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define UNREACHABLE()
Definition: Logger.h:253
#define CHECK_GE(x, y)
Definition: Logger.h:222
std::shared_ptr< arrow::Field > field
#define DICTIONARY
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:510
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
bool is_integer() const
Definition: sqltypes.h:506
ExecutorDeviceType device_type_
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
bool is_boolean() const
Definition: sqltypes.h:511
int get_precision() const
Definition: sqltypes.h:332
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
std::shared_ptr< ResultSet > results_
int64_t sm_size
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:432
#define IS_INTEGER(T)
Definition: sqltypes.h:245
Definition: sqltypes.h:41
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
char * f
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
bool is_dict_encoded_string() const
Definition: sqltypes.h:541
Definition: sqltypes.h:45
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:507
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
#define VLOG(n)
Definition: Logger.h:303
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
std::shared_ptr< arrow::RecordBatch > convertToArrow() const