OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
22 
23 #ifndef _MSC_VER
24 #include <sys/ipc.h>
25 #include <sys/shm.h>
26 #include <sys/types.h>
27 #else
28 // IPC shared memory not yet supported on windows
29 using key_t = size_t;
30 #define IPC_PRIVATE 0
31 #endif
32 
33 #include <algorithm>
34 #include <cerrno>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <future>
38 #include <string>
39 
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
43 
44 #include "Shared/ArrowUtil.h"
45 
46 #ifdef HAVE_CUDA
47 #include <arrow/gpu/cuda_api.h>
48 #include <cuda.h>
49 #endif // HAVE_CUDA
50 
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
52 
53 #define ARROW_CONVERTER_DEBUG true
54 
55 #define ARROW_LOG(category) \
56  VLOG(1) << "[Arrow]" \
57  << "[" << category "] "
58 
59 namespace {
60 
61 /* We can create Arrow buffers which refer memory owned by ResultSet.
62  For safe memory access we should keep a ResultSetPtr to keep
63  data live while buffer lives. Use this custom buffer for that. */
64 class ResultSetBuffer : public arrow::Buffer {
65  public:
66  ResultSetBuffer(const uint8_t* buf, size_t size, ResultSetPtr rs)
67  : arrow::Buffer(buf, size), _rs(rs) {}
68 
69  private:
71 };
72 
75  switch (ti.get_size()) {
76  case 1:
77  return kTINYINT;
78  case 2:
79  return kSMALLINT;
80  case 4:
81  return kINT;
82  case 8:
83  return kBIGINT;
84  default:
85  CHECK(false);
86  }
87  return ti.get_type();
88 }
89 
91  auto logical_type = ti.get_type();
92  if (IS_INTEGER(logical_type)) {
93  switch (ti.get_size()) {
94  case 1:
95  return kTINYINT;
96  case 2:
97  return kSMALLINT;
98  case 4:
99  return kINT;
100  case 8:
101  return kBIGINT;
102  default:
103  CHECK(false);
104  }
105  }
106  return logical_type;
107 }
108 
109 template <typename TYPE, typename VALUE_ARRAY_TYPE>
111  std::shared_ptr<ValueArray>& values,
112  const size_t max_size) {
113  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
114  CHECK(pval_cty);
115  auto val_ty = static_cast<TYPE>(*pval_cty);
116  if (!values) {
117  values = std::make_shared<ValueArray>(std::vector<TYPE>());
118  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
119  }
120  CHECK(values);
121  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
122  CHECK(values_ty);
123  values_ty->push_back(val_ty);
124 }
125 
126 template <typename TYPE>
128  const SQLTypeInfo& col_type,
129  std::shared_ptr<std::vector<bool>>& null_bitmap,
130  const size_t max_size) {
131  if (col_type.get_notnull()) {
132  CHECK(!null_bitmap);
133  return;
134  }
135  auto pvalue = boost::get<TYPE>(&value);
136  CHECK(pvalue);
137  bool is_valid = false;
138  if (col_type.is_boolean()) {
139  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
140  } else if (col_type.is_dict_encoded_string()) {
141  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
142  } else if (col_type.is_integer() || col_type.is_time()) {
143  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
144  } else if (col_type.is_fp()) {
145  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
146  } else if (col_type.is_decimal()) {
147  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
148  } else {
149  UNREACHABLE();
150  }
151 
152  if (!null_bitmap) {
153  null_bitmap = std::make_shared<std::vector<bool>>();
154  null_bitmap->reserve(max_size);
155  }
156  CHECK(null_bitmap);
157  null_bitmap->push_back(is_valid);
158 }
159 
160 template <typename TYPE, typename enable = void>
161 class null_type {};
162 
163 template <typename TYPE>
164 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
166  static constexpr type value = inline_int_null_value<type>();
167 };
168 
169 template <typename TYPE>
170 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
171  using type = TYPE;
172  static constexpr type value = inline_fp_null_value<type>();
173 };
174 
175 template <typename TYPE>
177 
178 template <typename C_TYPE,
179  typename ARROW_TYPE = typename arrow::CTypeTraits<C_TYPE>::ArrowType>
181  size_t col,
182  size_t entry_count,
183  std::shared_ptr<arrow::Array>& out) {
184  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
185 
186  std::shared_ptr<arrow::Buffer> values;
187  std::shared_ptr<arrow::Buffer> is_valid;
188  const int64_t buf_size = entry_count * sizeof(C_TYPE);
189  if (result->isZeroCopyColumnarConversionPossible(col)) {
190  values.reset(new ResultSetBuffer(
191  reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
192  buf_size,
193  result));
194  } else {
195  auto res = arrow::AllocateBuffer(buf_size);
196  CHECK(res.ok());
197  values = std::move(res).ValueOrDie();
198  result->copyColumnIntoBuffer(
199  col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
200  }
201 
202  int64_t null_count = 0;
203  auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
204  CHECK(res.ok());
205  is_valid = std::move(res).ValueOrDie();
206 
207  auto is_valid_data = is_valid->mutable_data();
208 
209  const null_type_t<C_TYPE>* vals =
210  reinterpret_cast<const null_type_t<C_TYPE>*>(values->data());
212 
213  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
214  for (size_t i = 0; i < unroll_count; i += 8) {
215  uint8_t valid_byte = 0;
216  uint8_t valid;
217  valid = vals[i + 0] != null_val;
218  valid_byte |= valid << 0;
219  null_count += !valid;
220  valid = vals[i + 1] != null_val;
221  valid_byte |= valid << 1;
222  null_count += !valid;
223  valid = vals[i + 2] != null_val;
224  valid_byte |= valid << 2;
225  null_count += !valid;
226  valid = vals[i + 3] != null_val;
227  valid_byte |= valid << 3;
228  null_count += !valid;
229  valid = vals[i + 4] != null_val;
230  valid_byte |= valid << 4;
231  null_count += !valid;
232  valid = vals[i + 5] != null_val;
233  valid_byte |= valid << 5;
234  null_count += !valid;
235  valid = vals[i + 6] != null_val;
236  valid_byte |= valid << 6;
237  null_count += !valid;
238  valid = vals[i + 7] != null_val;
239  valid_byte |= valid << 7;
240  null_count += !valid;
241  is_valid_data[i >> 3] = valid_byte;
242  }
243  if (unroll_count != entry_count) {
244  uint8_t valid_byte = 0;
245  for (size_t i = unroll_count; i < entry_count; ++i) {
246  bool valid = vals[i] != null_val;
247  valid_byte |= valid << (i & 7);
248  null_count += !valid;
249  }
250  is_valid_data[unroll_count >> 3] = valid_byte;
251  }
252 
253  if (!null_count) {
254  is_valid.reset();
255  }
256 
257  // TODO: support date/time + scaling
258  // TODO: support booleans
259  if (null_count) {
260  out.reset(
261  new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
262  } else {
263  out.reset(new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
264  }
265 }
266 
267 #ifndef _MSC_VER
268 std::pair<key_t, void*> get_shm(size_t shmsz) {
269  if (!shmsz) {
270  return std::make_pair(IPC_PRIVATE, nullptr);
271  }
272  // Generate a new key for a shared memory segment. Keys to shared memory segments
273  // are OS global, so we need to try a new key if we encounter a collision. It seems
274  // incremental keygen would be deterministically worst-case. If we use a hash
275  // (like djb2) + nonce, we could still get collisions if multiple clients specify
276  // the same nonce, so using rand() in lieu of a better approach
277  // TODO(ptaylor): Is this common? Are these assumptions true?
278  auto key = static_cast<key_t>(rand());
279  int shmid = -1;
280  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
281  // exist IPC_EXCL - ensures failure if a segment already exists for this key
282  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
283  // If shmget fails and errno is one of these four values, try a new key.
284  // TODO(ptaylor): is checking for the last three values really necessary? Checking
285  // them by default to be safe. EEXIST - a shared memory segment is already associated
286  // with this key EACCES - a shared memory segment is already associated with this key,
287  // but we don't have permission to access it EINVAL - a shared memory segment is
288  // already associated with this key, but the size is less than shmsz ENOENT -
289  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
290  // was found
291  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
292  throw std::runtime_error("failed to create a shared memory");
293  }
294  key = static_cast<key_t>(rand());
295  }
296  // get a pointer to the shared memory segment
297  auto ipc_ptr = shmat(shmid, NULL, 0);
298  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
299  throw std::runtime_error("failed to attach a shared memory");
300  }
301 
302  return std::make_pair(key, ipc_ptr);
303 }
304 #endif
305 
306 std::pair<key_t, std::shared_ptr<arrow::Buffer>> get_shm_buffer(size_t size) {
307 #ifdef _MSC_VER
308  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
309  return std::make_pair(0, nullptr);
310 #else
311  auto [key, ipc_ptr] = get_shm(size);
312  std::shared_ptr<arrow::Buffer> buffer(
313  new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
314  return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
315  std::move(buffer));
316 #endif
317 }
318 
319 } // namespace
320 
321 namespace arrow {
322 
323 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
324 #ifdef _MSC_VER
325  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
326 #else
327  auto [key, ipc_ptr] = get_shm(data->size());
328  // copy the arrow records buffer to shared memory
329  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
330  // write directly to the shared memory segment as a sink
331  memcpy(ipc_ptr, data->data(), data->size());
332  // detach from the shared memory segment
333  shmdt(ipc_ptr);
334  return key;
335 #endif
336 }
337 
338 } // namespace arrow
339 
344  auto timer = DEBUG_TIMER(__func__);
345  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
346 
347  struct BuildResultParams {
348  int64_t schemaSize() const {
349  return serialized_schema ? serialized_schema->size() : 0;
350  };
351  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
352  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
353  bool hasRecordBatch() const { return records_size > 0; }
354  bool hasDict() const { return dictSize() > 0; }
355 
356  int64_t records_size{0};
357  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
358  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
359  } result_params;
360 
363  const auto getWireResult = [&]() -> ArrowResult {
364  auto timer = DEBUG_TIMER("serialize batch to wire");
365  const auto total_size = result_params.totalSize();
366  std::vector<char> record_handle_data(total_size);
367  auto serialized_records =
368  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
369 
370  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
371 
372  ARROW_THROW_NOT_OK(writer->Write(
373  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
374  result_params.schemaSize()));
375 
376  if (result_params.hasDict()) {
377  ARROW_THROW_NOT_OK(writer->Write(
378  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
379  result_params.dictSize()));
380  }
381 
382  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
383  serialized_records, result_params.schemaSize() + result_params.dictSize()));
384 
385  if (result_params.hasRecordBatch()) {
386  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
387  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
388  }
389 
390  return {std::vector<char>(0),
391  0,
392  std::vector<char>(0),
393  serialized_records->size(),
394  std::string{""},
395  std::move(record_handle_data)};
396  };
397 
398  const auto getShmResult = [&]() -> ArrowResult {
399  auto timer = DEBUG_TIMER("serialize batch to shared memory");
400  std::shared_ptr<arrow::Buffer> serialized_records;
401  std::vector<char> schema_handle_buffer;
402  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
403  key_t records_shm_key = IPC_PRIVATE;
404  const int64_t total_size = result_params.totalSize();
405 
406  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
407 
408  memcpy(serialized_records->mutable_data(),
409  result_params.serialized_schema->data(),
410  (size_t)result_params.schemaSize());
411 
412  if (result_params.hasDict()) {
413  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
414  result_params.serialized_dict->data(),
415  (size_t)result_params.dictSize());
416  }
417 
418  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
419  serialized_records, result_params.schemaSize() + result_params.dictSize()));
420 
421  if (result_params.hasRecordBatch()) {
422  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
423  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
424  }
425 
426  memcpy(&record_handle_buffer[0],
427  reinterpret_cast<const unsigned char*>(&records_shm_key),
428  sizeof(key_t));
429 
430  return {schema_handle_buffer,
431  0,
432  record_handle_buffer,
433  serialized_records->size(),
434  std::string{""}};
435  };
436 
437  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
438  auto options = arrow::ipc::IpcWriteOptions::Defaults();
439  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
440 
441  // If our record batch is going to be empty, we omit it entirely,
442  // only serializing the schema.
443  if (!record_batch->num_rows()) {
444  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
445  arrow::ipc::SerializeSchema(*record_batch->schema(),
446  arrow::default_memory_pool()));
447 
448  switch (transport_method_) {
450  return getWireResult();
452  return getShmResult();
453  default:
454  UNREACHABLE();
455  }
456  }
457 
458  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
459 
460  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
461 
462  for (auto& pair : dictionaries) {
463  arrow::ipc::IpcPayload payload;
464  int64_t dictionary_id = pair.first;
465  const auto& dictionary = pair.second;
466 
468  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
469  int32_t metadata_length = 0;
471  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
472  }
473  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
474 
475  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
476  arrow::ipc::SerializeSchema(*record_batch->schema(),
477  arrow::default_memory_pool()));
478 
480  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
481 
482  switch (transport_method_) {
484  return getWireResult();
486  return getShmResult();
487  default:
488  UNREACHABLE();
489  }
490  }
491 #ifdef HAVE_CUDA
493 
494  // Copy the schema to the schema handle
495  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
496  ARROW_THROW_NOT_OK(out_stream_result.status());
497  auto out_stream = std::move(out_stream_result).ValueOrDie();
498 
499  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
500  arrow::ipc::DictionaryMemo current_memo;
501  arrow::ipc::DictionaryMemo serialized_memo;
502 
503  arrow::ipc::IpcPayload schema_payload;
504  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
505  arrow::ipc::IpcWriteOptions::Defaults(),
506  mapper,
507  &schema_payload));
508  int32_t schema_payload_length = 0;
509  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
510  arrow::ipc::IpcWriteOptions::Defaults(),
511  out_stream.get(),
512  &schema_payload_length));
513  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
514  ARROW_LOG("GPU") << "Dictionary "
515  << "found dicts: " << dictionaries.size();
516 
518  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
519 
520  // now try a dictionary
521  std::shared_ptr<arrow::Schema> dummy_schema;
522  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
523 
524  for (const auto& pair : dictionaries) {
525  arrow::ipc::IpcPayload payload;
526  const auto& dict_id = pair.first;
527  CHECK_GE(dict_id, 0);
528  ARROW_LOG("GPU") << "Dictionary "
529  << "dict_id: " << dict_id;
530  const auto& dict = pair.second;
531  CHECK(dict);
532 
533  if (!dummy_schema) {
534  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
535  dummy_schema = std::make_shared<arrow::Schema>(
536  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
537  }
538  dict_batches.emplace_back(
539  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
540  }
541 
542  if (!dict_batches.empty()) {
543  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
544  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
545  }
546 
547  auto complete_ipc_stream = out_stream->Finish();
548  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
549  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
550 
551  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
552  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
553  memcpy(&schema_record_key_buffer[0],
554  reinterpret_cast<const unsigned char*>(&record_key),
555  sizeof(key_t));
556 
557  arrow::cuda::CudaDeviceManager* manager;
558  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
559  std::shared_ptr<arrow::cuda::CudaContext> context;
560  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
561 
562  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
563  ARROW_ASSIGN_OR_THROW(device_serialized,
564  SerializeRecordBatch(*record_batch, context.get()));
565 
566  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
567  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
568 
569  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
570  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
571  cuda_handle->Serialize(arrow::default_memory_pool()));
572 
573  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
574  memcpy(&record_handle_buffer[0],
575  serialized_cuda_handle->data(),
576  serialized_cuda_handle->size());
577 
578  return {schema_record_key_buffer,
579  serialized_records->size(),
580  record_handle_buffer,
581  serialized_cuda_handle->size(),
582  serialized_cuda_handle->ToString()};
583 #else
584  UNREACHABLE();
585  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
586 #endif
587 }
588 
591  arrow::ipc::DictionaryFieldMapper* mapper) const {
592  auto timer = DEBUG_TIMER(__func__);
593  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
594  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
595 
597  serialized_schema,
598  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
599 
600  if (arrow_copy->num_rows()) {
601  auto timer = DEBUG_TIMER("serialize records");
602  ARROW_THROW_NOT_OK(arrow_copy->Validate());
603  ARROW_ASSIGN_OR_THROW(serialized_records,
604  arrow::ipc::SerializeRecordBatch(
605  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
606  } else {
607  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
608  }
609  return {serialized_schema, serialized_records};
610 }
611 
612 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
613  auto timer = DEBUG_TIMER(__func__);
614  const auto col_count = results_->colCount();
615  std::vector<std::shared_ptr<arrow::Field>> fields;
616  CHECK(col_names_.empty() || col_names_.size() == col_count);
617  for (size_t i = 0; i < col_count; ++i) {
618  const auto ti = results_->getColType(i);
619  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
620  }
621 #if ARROW_CONVERTER_DEBUG
622  VLOG(1) << "Arrow fields: ";
623  for (const auto& f : fields) {
624  VLOG(1) << "\t" << f->ToString(true);
625  }
626 #endif
627  return getArrowBatch(arrow::schema(fields));
628 }
629 
630 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
631  const std::shared_ptr<arrow::Schema>& schema) const {
632  std::vector<std::shared_ptr<arrow::Array>> result_columns;
633 
634  // First, check if the result set is empty.
635  // If so, we return an arrow result set that only
636  // contains the schema (no record batch will be serialized).
637  if (results_->isEmpty()) {
638  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
639  }
640 
641  const size_t entry_count = top_n_ < 0
642  ? results_->entryCount()
643  : std::min(size_t(top_n_), results_->entryCount());
644 
645  const auto col_count = results_->colCount();
646  size_t row_count = 0;
647 
648  result_columns.resize(col_count);
649  std::vector<ColumnBuilder> builders(col_count);
650 
651  // Create array builders
652  for (size_t i = 0; i < col_count; ++i) {
653  initializeColumnBuilder(builders[i], results_->getColType(i), i, schema->field(i));
654  }
655 
656  // TODO(miyu): speed up for columnar buffers
657  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
658  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
659  const std::vector<bool>& non_lazy_cols,
660  const size_t start_entry,
661  const size_t end_entry) -> size_t {
662  CHECK_EQ(value_seg.size(), col_count);
663  CHECK_EQ(null_bitmap_seg.size(), col_count);
664  const auto local_entry_count = end_entry - start_entry;
665  size_t seg_row_count = 0;
666  for (size_t i = start_entry; i < end_entry; ++i) {
667  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
668  if (row.empty()) {
669  continue;
670  }
671  ++seg_row_count;
672  for (size_t j = 0; j < col_count; ++j) {
673  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
674  continue;
675  }
676 
677  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
678  // TODO(miyu): support more types other than scalar.
679  CHECK(scalar_value);
680  const auto& column = builders[j];
681  switch (column.physical_type) {
682  case kBOOLEAN:
683  create_or_append_value<bool, int64_t>(
684  *scalar_value, value_seg[j], local_entry_count);
685  create_or_append_validity<int64_t>(
686  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
687  break;
688  case kTINYINT:
689  create_or_append_value<int8_t, int64_t>(
690  *scalar_value, value_seg[j], local_entry_count);
691  create_or_append_validity<int64_t>(
692  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
693  break;
694  case kSMALLINT:
695  create_or_append_value<int16_t, int64_t>(
696  *scalar_value, value_seg[j], local_entry_count);
697  create_or_append_validity<int64_t>(
698  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
699  break;
700  case kINT:
701  create_or_append_value<int32_t, int64_t>(
702  *scalar_value, value_seg[j], local_entry_count);
703  create_or_append_validity<int64_t>(
704  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
705  break;
706  case kBIGINT:
707  create_or_append_value<int64_t, int64_t>(
708  *scalar_value, value_seg[j], local_entry_count);
709  create_or_append_validity<int64_t>(
710  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
711  break;
712  case kDECIMAL:
713  create_or_append_value<int64_t, int64_t>(
714  *scalar_value, value_seg[j], local_entry_count);
715  create_or_append_validity<int64_t>(
716  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
717  break;
718  case kFLOAT:
719  create_or_append_value<float, float>(
720  *scalar_value, value_seg[j], local_entry_count);
721  create_or_append_validity<float>(
722  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
723  break;
724  case kDOUBLE:
725  create_or_append_value<double, double>(
726  *scalar_value, value_seg[j], local_entry_count);
727  create_or_append_validity<double>(
728  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
729  break;
730  case kTIME:
731  create_or_append_value<int32_t, int64_t>(
732  *scalar_value, value_seg[j], local_entry_count);
733  create_or_append_validity<int64_t>(
734  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
735  break;
736  case kDATE:
738  ? create_or_append_value<int64_t, int64_t>(
739  *scalar_value, value_seg[j], local_entry_count)
740  : create_or_append_value<int32_t, int64_t>(
741  *scalar_value, value_seg[j], local_entry_count);
742  create_or_append_validity<int64_t>(
743  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
744  break;
745  case kTIMESTAMP:
746  create_or_append_value<int64_t, int64_t>(
747  *scalar_value, value_seg[j], local_entry_count);
748  create_or_append_validity<int64_t>(
749  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
750  break;
751  default:
752  // TODO(miyu): support more scalar types.
753  throw std::runtime_error(column.col_type.get_type_name() +
754  " is not supported in Arrow result sets.");
755  }
756  }
757  }
758  return seg_row_count;
759  };
760 
761  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
762  const std::vector<bool>& non_lazy_cols,
763  const size_t start_col,
764  const size_t end_col) {
765  for (size_t col = start_col; col < end_col; ++col) {
766  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
767  continue;
768  }
769 
770  const auto& column = builders[col];
771  switch (column.physical_type) {
772  case kTINYINT:
773  convert_column<int8_t>(results_, col, entry_count, result[col]);
774  break;
775  case kSMALLINT:
776  convert_column<int16_t>(results_, col, entry_count, result[col]);
777  break;
778  case kINT:
779  convert_column<int32_t>(results_, col, entry_count, result[col]);
780  break;
781  case kBIGINT:
782  convert_column<int64_t>(results_, col, entry_count, result[col]);
783  break;
784  case kFLOAT:
785  convert_column<float>(results_, col, entry_count, result[col]);
786  break;
787  case kDOUBLE:
788  convert_column<double>(results_, col, entry_count, result[col]);
789  break;
790  default:
791  throw std::runtime_error(column.col_type.get_type_name() +
792  " is not supported in Arrow column converter.");
793  }
794  }
795  };
796 
797  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
798  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
799  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
800  // Don't believe we ever output directly from a table function, but this
801  // might be possible with a future query plan optimization
802  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
803  (results_->getQueryMemDesc().getQueryDescriptionType() ==
805  results_->getQueryMemDesc().getQueryDescriptionType() ==
807  entry_count == results_->entryCount();
808  std::vector<bool> non_lazy_cols;
809  if (use_columnar_converter) {
810  auto timer = DEBUG_TIMER("columnar converter");
811  std::vector<size_t> non_lazy_col_pos;
812  size_t non_lazy_col_count = 0;
813  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
814 
815  non_lazy_cols.reserve(col_count);
816  non_lazy_col_pos.reserve(col_count);
817  for (size_t i = 0; i < col_count; ++i) {
818  bool is_lazy =
819  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
820  // Currently column converter cannot handle some data types.
821  // Treat them as lazy.
822  switch (builders[i].physical_type) {
823  case kBOOLEAN:
824  case kTIME:
825  case kDATE:
826  case kTIMESTAMP:
827  is_lazy = true;
828  break;
829  default:
830  break;
831  }
832  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
833  is_lazy = true;
834  }
835  non_lazy_cols.emplace_back(!is_lazy);
836  if (!is_lazy) {
837  ++non_lazy_col_count;
838  non_lazy_col_pos.emplace_back(i);
839  }
840  }
841 
842  if (non_lazy_col_count == col_count) {
843  non_lazy_cols.clear();
844  non_lazy_col_pos.clear();
845  } else {
846  non_lazy_col_pos.emplace_back(col_count);
847  }
848 
849  std::vector<std::future<void>> child_threads;
850  size_t num_threads =
851  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
852 
853  size_t start_col = 0;
854  size_t end_col = 0;
855  for (size_t i = 0; i < num_threads; ++i) {
856  start_col = end_col;
857  end_col = (i + 1) * non_lazy_col_count / num_threads;
858  size_t phys_start_col =
859  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
860  size_t phys_end_col =
861  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
862  child_threads.push_back(std::async(std::launch::async,
863  convert_columns,
864  std::ref(result_columns),
865  non_lazy_cols,
866  phys_start_col,
867  phys_end_col));
868  }
869  for (auto& child : child_threads) {
870  child.get();
871  }
872  row_count = entry_count;
873  }
874  if (!use_columnar_converter || !non_lazy_cols.empty()) {
875  auto timer = DEBUG_TIMER("row converter");
876  row_count = 0;
877  if (multithreaded) {
878  const size_t cpu_count = cpu_threads();
879  std::vector<std::future<size_t>> child_threads;
880  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
881  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
882  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
883  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
884  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
885  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
886  ++i, start_entry += stride) {
887  const auto end_entry = std::min(entry_count, start_entry + stride);
888  child_threads.push_back(std::async(std::launch::async,
889  fetch,
890  std::ref(column_value_segs[i]),
891  std::ref(null_bitmap_segs[i]),
892  non_lazy_cols,
893  start_entry,
894  end_entry));
895  }
896  for (auto& child : child_threads) {
897  row_count += child.get();
898  }
899  {
900  auto timer = DEBUG_TIMER("append rows to arrow");
901  for (int i = 0; i < schema->num_fields(); ++i) {
902  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
903  continue;
904  }
905 
906  for (size_t j = 0; j < cpu_count; ++j) {
907  if (!column_value_segs[j][i]) {
908  continue;
909  }
910  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
911  }
912  }
913  }
914  } else {
915  row_count =
916  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
917  {
918  auto timer = DEBUG_TIMER("append rows to arrow single thread");
919  for (int i = 0; i < schema->num_fields(); ++i) {
920  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
921  continue;
922  }
923 
924  append(builders[i], *column_values[i], null_bitmaps[i]);
925  }
926  }
927  }
928 
929  {
930  auto timer = DEBUG_TIMER("finish builders");
931  for (size_t i = 0; i < col_count; ++i) {
932  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
933  continue;
934  }
935 
936  result_columns[i] = finishColumnBuilder(builders[i]);
937  }
938  }
939  }
940 
941  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
942 }
943 
944 namespace {
945 
946 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
947  const ExecutorDeviceType device_type) {
948  switch (get_physical_type(sql_type)) {
949  case kBOOLEAN:
950  return arrow::boolean();
951  case kTINYINT:
952  return arrow::int8();
953  case kSMALLINT:
954  return arrow::int16();
955  case kINT:
956  return arrow::int32();
957  case kBIGINT:
958  return arrow::int64();
959  case kFLOAT:
960  return arrow::float32();
961  case kDOUBLE:
962  return arrow::float64();
963  case kCHAR:
964  case kVARCHAR:
965  case kTEXT:
966  if (sql_type.is_dict_encoded_string()) {
967  auto value_type = std::make_shared<arrow::StringType>();
968  return dictionary(arrow::int32(), value_type, false);
969  }
970  return arrow::utf8();
971  case kDECIMAL:
972  case kNUMERIC:
973  return arrow::decimal(sql_type.get_precision(), sql_type.get_scale());
974  case kTIME:
975  return time32(arrow::TimeUnit::SECOND);
976  case kDATE: {
977  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
978  // Currently support for date32() is missing in cuDF.Hence, if client requests for
979  // date on GPU, return date64() for the time being, till support is added.
980  if (device_type == ExecutorDeviceType::GPU) {
981  return arrow::date64();
982  } else {
983  return arrow::date32();
984  }
985  }
986  case kTIMESTAMP:
987  switch (sql_type.get_precision()) {
988  case 0:
989  return timestamp(arrow::TimeUnit::SECOND);
990  case 3:
991  return timestamp(arrow::TimeUnit::MILLI);
992  case 6:
993  return timestamp(arrow::TimeUnit::MICRO);
994  case 9:
995  return timestamp(arrow::TimeUnit::NANO);
996  default:
997  throw std::runtime_error(
998  "Unsupported timestamp precision for Arrow result sets: " +
999  std::to_string(sql_type.get_precision()));
1000  }
1001  case kARRAY:
1002  case kINTERVAL_DAY_TIME:
1003  case kINTERVAL_YEAR_MONTH:
1004  default:
1005  throw std::runtime_error(sql_type.get_type_name() +
1006  " is not supported in Arrow result sets.");
1007  }
1008  return nullptr;
1009 }
1010 
1011 } // namespace
1012 
1013 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
1014  const std::string name,
1015  const SQLTypeInfo& target_type) const {
1016  return arrow::field(
1017  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1018 }
1019 
1021  const ArrowResult& result,
1022  const ExecutorDeviceType device_type,
1023  const size_t device_id,
1024  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1025 #ifndef _MSC_VER
1026  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
1027  // Remove shared memory on sysmem
1028  if (!result.sm_handle.empty()) {
1029  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
1030  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
1031  auto shm_id = shmget(schema_key, result.sm_size, 0666);
1032  if (shm_id < 0) {
1033  throw std::runtime_error(
1034  "failed to get an valid shm ID w/ given shm key of the schema");
1035  }
1036  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1037  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
1038  std::to_string(errno) + ")");
1039  }
1040  }
1041 
1042  if (device_type == ExecutorDeviceType::CPU) {
1043  CHECK_EQ(sizeof(key_t), result.df_handle.size());
1044  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
1045  auto shm_id = shmget(df_key, result.df_size, 0666);
1046  if (shm_id < 0) {
1047  throw std::runtime_error(
1048  "failed to get an valid shm ID w/ given shm key of the data");
1049  }
1050  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1051  throw std::runtime_error("failed to deallocate Arrow data frame");
1052  }
1053  }
1054  // CUDA buffers become owned by the caller, and will automatically be freed
1055  // TODO: What if the client never takes ownership of the result? we may want to
1056  // establish a check to see if the GPU buffer still exists, and then free it.
1057 #endif
1058 }
1059 
1061  ColumnBuilder& column_builder,
1062  const SQLTypeInfo& col_type,
1063  const size_t results_col_slot_idx,
1064  const std::shared_ptr<arrow::Field>& field) const {
1065  column_builder.field = field;
1066  column_builder.col_type = col_type;
1067  column_builder.physical_type = col_type.is_dict_encoded_string()
1068  ? get_dict_index_type(col_type)
1069  : get_physical_type(col_type);
1070 
1071  auto value_type = field->type();
1072  if (col_type.is_dict_encoded_string()) {
1073  auto timer = DEBUG_TIMER("Translate string dictionary to Arrow dictionary");
1074  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1075  // add values to the builder
1076  const int dict_id = col_type.get_comp_param();
1077 
1078  // ResultSet::rowCount(), unlike ResultSet::entryCount(), will return
1079  // the actual number of rows in the result set, taking into account
1080  // things like any limit and offset set
1081  const size_t result_set_rows = results_->rowCount();
1082  // result_set_rows guaranteed > 0 by parent
1083  CHECK_GT(result_set_rows, 0UL);
1084 
1085  auto sdp = results_->getStringDictionaryProxy(dict_id);
1086  const size_t dictionary_proxy_entries = sdp->entryCount();
1087  const double dictionary_to_result_size_ratio =
1088  static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1089 
1090  // We are conservative with when we do a bulk dictionary fetch,
1091  // even though it is generally more efficient than dictionary unique value "plucking",
1092  // for the following reasons:
1093  // 1) The number of actual distinct dictionary values can be much lower than the
1094  // number of result rows, but without getting the expression range (and that would
1095  // only work in some cases), we don't know by how much
1096  // 2) Regardless of the effect of #1, the size of the dictionary generated via
1097  // the "pluck" method will always be at worst equal in size, and very likely
1098  // significantly smaller, than the dictionary created by the bulk dictionary
1099  // fetch method, and smaller Arrow dictionaries are always a win when it comes to
1100  // sending the Arrow results over the wire, and for lowering the processing load
1101  // for clients (which often is a web browser with a lot less compute and memory
1102  // resources than our server.)
1103 
1104  const bool do_dictionary_bulk_fetch =
1105  result_set_rows > min_result_size_for_bulk_dictionary_fetch_ &&
1106  dictionary_to_result_size_ratio <=
1108 
1109  arrow::StringBuilder str_array_builder;
1110 
1111  if (do_dictionary_bulk_fetch) {
1112  VLOG(1) << "Arrow dictionary creation: bulk copying all dictionary "
1113  << " entries for column at offset " << results_col_slot_idx << ". "
1114  << "Column has " << dictionary_proxy_entries << " string entries"
1115  << " for a result set with " << result_set_rows << " rows.";
1116  column_builder.string_remap_mode =
1118  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1119  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(str_list));
1120 
1121  // When we fetch the bulk dictionary, we need to also fetch
1122  // the transient entries only contained in the proxy.
1123  // These values are always negative (starting at -2), and so need
1124  // to be remapped to point to the corresponding entries in the Arrow
1125  // dictionary (they are placed at the end after the materialized
1126  // string entries from StringDictionary)
1127 
1128  const auto& transient_map = sdp->getTransientMapping();
1129  int32_t crt_transient_id = static_cast<int32_t>(str_list.size());
1130  for (auto transient_pair : transient_map) {
1131  ARROW_THROW_NOT_OK(str_array_builder.Append(transient_pair.second));
1132  CHECK(column_builder.string_remapping
1133  .insert(std::make_pair(transient_pair.first, crt_transient_id++))
1134  .second);
1135  }
1136  } else {
1137  // Pluck unique dictionary values from ResultSet column
1138  VLOG(1) << "Arrow dictionary creation: serializing unique result set dictionary "
1139  << " entries for column at offset " << results_col_slot_idx << ". "
1140  << "Column has " << dictionary_proxy_entries << " string entries"
1141  << " for a result set with " << result_set_rows << " rows.";
1143 
1144  // ResultSet::getUniqueStringsForDictEncodedTargetCol returns a pair of two vectors,
1145  // the first of int32_t values containing the unique string ids found for
1146  // results_col_slot_idx in the result set, the second containing the associated
1147  // unique strings. Note that the unique string for a unique string id are both
1148  // placed at the same offset in their respective vectors
1149 
1150  auto unique_ids_and_strings =
1151  results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1152  const auto& unique_ids = unique_ids_and_strings.first;
1153  const auto& unique_strings = unique_ids_and_strings.second;
1154  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(unique_strings));
1155  const int32_t num_unique_strings = unique_strings.size();
1156  CHECK_EQ(num_unique_strings, unique_ids.size());
1157  // We need to remap ALL string id values given the Arrow dictionary
1158  // will have "holes", i.e. it is a sparse representation of the underlying
1159  // StringDictionary
1160  for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1161  ++unique_string_idx) {
1162  CHECK(
1163  column_builder.string_remapping
1164  .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1165  .second);
1166  }
1167  // Note we don't need to get transients from proxy as they are already handled in
1168  // ResultSet::getUniqueStringsForDictEncodedTargetCol
1169  }
1170 
1171  std::shared_ptr<arrow::StringArray> string_array;
1172  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1173 
1174  auto dict_builder =
1175  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1176  CHECK(dict_builder);
1177 
1178  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1179  } else {
1180  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1181  arrow::default_memory_pool(), value_type, &column_builder.builder));
1182  }
1183 }
1184 
1185 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1186  ColumnBuilder& column_builder) const {
1187  std::shared_ptr<arrow::Array> values;
1188  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1189  return values;
1190 }
1191 
1192 namespace {
1193 
1194 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1196  const ValueArray& values,
1197  const std::shared_ptr<std::vector<bool>>& is_valid) {
1198  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1199  "Dictionary encoded string builder requires function specialization.");
1200 
1201  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1202 
1203  if (scale_epoch_values<BUILDER_TYPE>()) {
1204  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1205  auto scale_values = [&](auto epoch) {
1206  return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1208  : scale_sec_to_millisec(epoch);
1209  };
1210  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1211  }
1212 
1213  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1214  CHECK(typed_builder);
1215  if (column_builder.field->nullable()) {
1216  CHECK(is_valid.get());
1217  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1218  } else {
1219  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1220  }
1221 }
1222 
1223 template <>
1224 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1226  const ValueArray& values,
1227  const std::shared_ptr<std::vector<bool>>& is_valid) {
1228  std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1229  auto typed_builder =
1230  dynamic_cast<arrow::Decimal128Builder*>(column_builder.builder.get());
1231  CHECK(typed_builder);
1232  CHECK_EQ(is_valid->size(), vals.size());
1233  if (column_builder.field->nullable()) {
1234  CHECK(is_valid.get());
1235  for (size_t i = 0; i < vals.size(); i++) {
1236  const auto v = vals[i];
1237  const auto valid = (*is_valid)[i];
1238  if (valid) {
1239  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1240  } else {
1241  ARROW_THROW_NOT_OK(typed_builder->AppendNull());
1242  }
1243  }
1244  } else {
1245  for (const auto& v : vals) {
1246  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1247  }
1248  }
1249 }
1250 
1251 template <>
1252 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1254  const ValueArray& values,
1255  const std::shared_ptr<std::vector<bool>>& is_valid) {
1256  auto typed_builder =
1257  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1258  CHECK(typed_builder);
1259 
1260  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1261  // remap negative values if ArrowStringRemapMode == ONLY_TRANSIENT_STRINGS_REMAPPED or
1262  // everything if ALL_STRINGS_REMAPPED
1263  CHECK(column_builder.string_remap_mode != ArrowStringRemapMode::INVALID);
1264  for (size_t i = 0; i < vals.size(); i++) {
1265  auto& val = vals[i];
1266  if ((column_builder.string_remap_mode == ArrowStringRemapMode::ALL_STRINGS_REMAPPED ||
1267  val < 0) &&
1268  (*is_valid)[i]) {
1269  vals[i] = column_builder.string_remapping.at(val);
1270  }
1271  }
1272 
1273  if (column_builder.field->nullable()) {
1274  CHECK(is_valid.get());
1275  // TODO(adb): Generate this instead of the boolean bitmap
1276  std::vector<uint8_t> transformed_bitmap;
1277  transformed_bitmap.reserve(is_valid->size());
1278  std::for_each(
1279  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1280  transformed_bitmap.push_back(is_valid ? 1 : 0);
1281  });
1282 
1283  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1284  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1285  } else {
1287  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1288  }
1289 }
1290 
1291 } // namespace
1292 
1294  ColumnBuilder& column_builder,
1295  const ValueArray& values,
1296  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1297  if (column_builder.col_type.is_dict_encoded_string()) {
1298  CHECK_EQ(column_builder.physical_type,
1299  kINT); // assume all dicts use none-encoded type for now
1300  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1301  column_builder, values, is_valid);
1302  return;
1303  }
1304  switch (column_builder.physical_type) {
1305  case kBOOLEAN:
1306  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1307  column_builder, values, is_valid);
1308  break;
1309  case kTINYINT:
1310  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1311  break;
1312  case kSMALLINT:
1313  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1314  column_builder, values, is_valid);
1315  break;
1316  case kINT:
1317  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1318  column_builder, values, is_valid);
1319  break;
1320  case kBIGINT:
1321  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1322  column_builder, values, is_valid);
1323  break;
1324  case kDECIMAL:
1325  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1326  column_builder, values, is_valid);
1327  break;
1328  case kFLOAT:
1329  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1330  break;
1331  case kDOUBLE:
1332  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1333  column_builder, values, is_valid);
1334  break;
1335  case kTIME:
1336  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1337  column_builder, values, is_valid);
1338  break;
1339  case kTIMESTAMP:
1340  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1341  column_builder, values, is_valid);
1342  break;
1343  case kDATE:
1345  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1346  column_builder, values, is_valid)
1347  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1348  column_builder, values, is_valid);
1349  break;
1350  case kCHAR:
1351  case kVARCHAR:
1352  case kTEXT:
1353  default:
1354  // TODO(miyu): support more scalar types.
1355  throw std::runtime_error(column_builder.col_type.get_type_name() +
1356  " is not supported in Arrow result sets.");
1357  }
1358 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:219
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:49
SQLTypes
Definition: sqltypes.h:38
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
bool is_fp() const
Definition: sqltypes.h:523
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
string name
Definition: setup.in.py:72
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::shared_ptr< arrow::Field > field
#define DICTIONARY
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
#define CHECK_GT(x, y)
Definition: Logger.h:223
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:525
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
bool is_integer() const
Definition: sqltypes.h:521
ExecutorDeviceType device_type_
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:290
bool is_boolean() const
Definition: sqltypes.h:526
int get_precision() const
Definition: sqltypes.h:332
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
std::unordered_map< StrId, ArrowStrId > string_remapping
std::shared_ptr< ResultSet > results_
int64_t sm_size
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:442
#define IS_INTEGER(T)
Definition: sqltypes.h:245
Definition: sqltypes.h:41
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
char * f
int64_t get_epoch_days_from_seconds(const int64_t seconds)
bool is_dict_encoded_string() const
Definition: sqltypes.h:557
Definition: sqltypes.h:45
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:522
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
#define VLOG(n)
Definition: Logger.h:305
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
std::shared_ptr< arrow::RecordBatch > convertToArrow() const