OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
22 
23 #ifndef _MSC_VER
24 #include <sys/ipc.h>
25 #include <sys/shm.h>
26 #include <sys/types.h>
27 #else
28 // IPC shared memory not yet supported on windows
29 using key_t = size_t;
30 #define IPC_PRIVATE 0
31 #endif
32 
33 #include <algorithm>
34 #include <cerrno>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <future>
38 #include <string>
39 
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
43 
44 #include "Shared/ArrowUtil.h"
45 
46 #ifdef HAVE_CUDA
47 #include <arrow/gpu/cuda_api.h>
48 #include <cuda.h>
49 #endif // HAVE_CUDA
50 
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
52 
53 #define ARROW_CONVERTER_DEBUG true
54 
55 #define ARROW_LOG(category) \
56  VLOG(1) << "[Arrow]" \
57  << "[" << category "] "
58 
59 namespace {
60 
61 /* We can create Arrow buffers which refer memory owned by ResultSet.
62  For safe memory access we should keep a ResultSetPtr to keep
63  data live while buffer lives. Use this custom buffer for that. */
64 class ResultSetBuffer : public arrow::Buffer {
65  public:
66  ResultSetBuffer(const uint8_t* buf, size_t size, ResultSetPtr rs)
67  : arrow::Buffer(buf, size), _rs(rs) {}
68 
69  private:
71 };
72 
75  switch (ti.get_size()) {
76  case 1:
77  return kTINYINT;
78  case 2:
79  return kSMALLINT;
80  case 4:
81  return kINT;
82  case 8:
83  return kBIGINT;
84  default:
85  CHECK(false);
86  }
87  return ti.get_type();
88 }
89 
91  auto logical_type = ti.get_type();
92  if (IS_INTEGER(logical_type)) {
93  switch (ti.get_size()) {
94  case 1:
95  return kTINYINT;
96  case 2:
97  return kSMALLINT;
98  case 4:
99  return kINT;
100  case 8:
101  return kBIGINT;
102  default:
103  CHECK(false);
104  }
105  }
106  return logical_type;
107 }
108 
109 template <typename TYPE, typename VALUE_ARRAY_TYPE>
111  std::shared_ptr<ValueArray>& values,
112  const size_t max_size) {
113  if (!values) {
114  values = std::make_shared<ValueArray>(std::vector<TYPE>());
115  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
116  }
117  CHECK(values);
118  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
119  CHECK(values_ty);
120 
121  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
122  CHECK(pval_cty);
123  if constexpr (std::is_same_v<VALUE_ARRAY_TYPE, NullableString>) {
124  if (auto str = boost::get<std::string>(pval_cty)) {
125  values_ty->push_back(*str);
126  } else {
127  values_ty->push_back("");
128  }
129  } else {
130  auto val_ty = static_cast<TYPE>(*pval_cty);
131  values_ty->push_back(val_ty);
132  }
133 }
134 
135 template <typename TYPE, typename VALUE_ARRAY_TYPE>
137  std::shared_ptr<ValueArray>& values,
138  const size_t max_size) {
139  if (!values) {
140  values = std::make_shared<ValueArray>(Vec2<TYPE>());
141  boost::get<Vec2<TYPE>>(*values).reserve(max_size);
142  }
143  CHECK(values);
144 
145  Vec2<TYPE>* values_ty = boost::get<Vec2<TYPE>>(values.get());
146  CHECK(values_ty);
147 
148  values_ty->emplace_back(std::vector<TYPE>{});
149 
150  if (val_ctys) {
151  for (auto val_cty : val_ctys.value()) {
152  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
153  CHECK(pval_cty);
154  auto val_ty = static_cast<TYPE>(*pval_cty);
155  values_ty->back().emplace_back(val_ty);
156  }
157  }
158 }
159 
161  const SQLTypeInfo& col_type,
162  std::shared_ptr<std::vector<bool>>& null_bitmap,
163  const size_t max_size) {
164  if (col_type.get_notnull()) {
165  CHECK(!null_bitmap);
166  return;
167  }
168 
169  if (!null_bitmap) {
170  null_bitmap = std::make_shared<std::vector<bool>>();
171  null_bitmap->reserve(max_size);
172  }
173  CHECK(null_bitmap);
174  null_bitmap->push_back(value ? true : false);
175 }
176 
177 template <typename TYPE>
179  const SQLTypeInfo& col_type,
180  std::shared_ptr<std::vector<bool>>& null_bitmap,
181  const size_t max_size) {
182  if (col_type.get_notnull()) {
183  CHECK(!null_bitmap);
184  return;
185  }
186  auto pvalue = boost::get<TYPE>(&value);
187  CHECK(pvalue);
188  bool is_valid = false;
189  if constexpr (std::is_same_v<TYPE, NullableString>) {
190  is_valid = boost::get<std::string>(pvalue) != nullptr;
191  } else {
192  if (col_type.is_boolean()) {
193  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
194  } else if (col_type.is_dict_encoded_string()) {
195  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
196  } else if (col_type.is_integer() || col_type.is_time() || col_type.is_decimal()) {
197  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
198  } else if (col_type.is_fp()) {
199  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
200  } else {
201  UNREACHABLE();
202  }
203  }
204 
205  if (!null_bitmap) {
206  null_bitmap = std::make_shared<std::vector<bool>>();
207  null_bitmap->reserve(max_size);
208  }
209  CHECK(null_bitmap);
210  null_bitmap->push_back(is_valid);
211 }
212 
213 template <typename TYPE, typename enable = void>
214 class null_type {};
215 
216 template <typename TYPE>
217 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
219  static constexpr type value = inline_int_null_value<type>();
220 };
221 
222 template <typename TYPE>
223 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
224  using type = TYPE;
225  static constexpr type value = inline_fp_null_value<type>();
226 };
227 
228 template <typename TYPE>
230 
231 template <typename C_TYPE,
232  typename ARROW_TYPE = typename arrow::CTypeTraits<C_TYPE>::ArrowType>
234  size_t col,
235  size_t entry_count,
236  std::shared_ptr<arrow::Array>& out) {
237  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
238 
239  std::shared_ptr<arrow::Buffer> values;
240  std::shared_ptr<arrow::Buffer> is_valid;
241  const int64_t buf_size = entry_count * sizeof(C_TYPE);
242  if (result->isZeroCopyColumnarConversionPossible(col)) {
243  values.reset(new ResultSetBuffer(
244  reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
245  buf_size,
246  result));
247  } else {
248  auto res = arrow::AllocateBuffer(buf_size);
249  CHECK(res.ok());
250  values = std::move(res).ValueOrDie();
251  result->copyColumnIntoBuffer(
252  col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
253  }
254 
255  int64_t null_count = 0;
256  auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
257  CHECK(res.ok());
258  is_valid = std::move(res).ValueOrDie();
259 
260  auto is_valid_data = is_valid->mutable_data();
261 
262  const null_type_t<C_TYPE>* vals =
263  reinterpret_cast<const null_type_t<C_TYPE>*>(values->data());
265 
266  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
267  for (size_t i = 0; i < unroll_count; i += 8) {
268  uint8_t valid_byte = 0;
269  uint8_t valid;
270  valid = vals[i + 0] != null_val;
271  valid_byte |= valid << 0;
272  null_count += !valid;
273  valid = vals[i + 1] != null_val;
274  valid_byte |= valid << 1;
275  null_count += !valid;
276  valid = vals[i + 2] != null_val;
277  valid_byte |= valid << 2;
278  null_count += !valid;
279  valid = vals[i + 3] != null_val;
280  valid_byte |= valid << 3;
281  null_count += !valid;
282  valid = vals[i + 4] != null_val;
283  valid_byte |= valid << 4;
284  null_count += !valid;
285  valid = vals[i + 5] != null_val;
286  valid_byte |= valid << 5;
287  null_count += !valid;
288  valid = vals[i + 6] != null_val;
289  valid_byte |= valid << 6;
290  null_count += !valid;
291  valid = vals[i + 7] != null_val;
292  valid_byte |= valid << 7;
293  null_count += !valid;
294  is_valid_data[i >> 3] = valid_byte;
295  }
296  if (unroll_count != entry_count) {
297  uint8_t valid_byte = 0;
298  for (size_t i = unroll_count; i < entry_count; ++i) {
299  bool valid = vals[i] != null_val;
300  valid_byte |= valid << (i & 7);
301  null_count += !valid;
302  }
303  is_valid_data[unroll_count >> 3] = valid_byte;
304  }
305 
306  if (!null_count) {
307  is_valid.reset();
308  }
309 
310  // TODO: support date/time + scaling
311  // TODO: support booleans
312  if (null_count) {
313  out.reset(
314  new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
315  } else {
316  out.reset(new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
317  }
318 }
319 
320 #ifndef _MSC_VER
321 std::pair<key_t, void*> get_shm(size_t shmsz) {
322  if (!shmsz) {
323  return std::make_pair(IPC_PRIVATE, nullptr);
324  }
325  // Generate a new key for a shared memory segment. Keys to shared memory segments
326  // are OS global, so we need to try a new key if we encounter a collision. It seems
327  // incremental keygen would be deterministically worst-case. If we use a hash
328  // (like djb2) + nonce, we could still get collisions if multiple clients specify
329  // the same nonce, so using rand() in lieu of a better approach
330  // TODO(ptaylor): Is this common? Are these assumptions true?
331  auto key = static_cast<key_t>(rand());
332  int shmid = -1;
333  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
334  // exist IPC_EXCL - ensures failure if a segment already exists for this key
335  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
336  // If shmget fails and errno is one of these four values, try a new key.
337  // TODO(ptaylor): is checking for the last three values really necessary? Checking
338  // them by default to be safe. EEXIST - a shared memory segment is already associated
339  // with this key EACCES - a shared memory segment is already associated with this key,
340  // but we don't have permission to access it EINVAL - a shared memory segment is
341  // already associated with this key, but the size is less than shmsz ENOENT -
342  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
343  // was found
344  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
345  throw std::runtime_error("failed to create a shared memory");
346  }
347  key = static_cast<key_t>(rand());
348  }
349  // get a pointer to the shared memory segment
350  auto ipc_ptr = shmat(shmid, NULL, 0);
351  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
352  throw std::runtime_error("failed to attach a shared memory");
353  }
354 
355  return std::make_pair(key, ipc_ptr);
356 }
357 #endif
358 
359 std::pair<key_t, std::shared_ptr<arrow::Buffer>> get_shm_buffer(size_t size) {
360 #ifdef _MSC_VER
361  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
362  return std::make_pair(0, nullptr);
363 #else
364  auto [key, ipc_ptr] = get_shm(size);
365  std::shared_ptr<arrow::Buffer> buffer(
366  new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
367  return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
368  std::move(buffer));
369 #endif
370 }
371 
372 } // namespace
373 
374 namespace arrow {
375 
376 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
377 #ifdef _MSC_VER
378  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
379 #else
380  auto [key, ipc_ptr] = get_shm(data->size());
381  // copy the arrow records buffer to shared memory
382  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
383  // write directly to the shared memory segment as a sink
384  memcpy(ipc_ptr, data->data(), data->size());
385  // detach from the shared memory segment
386  shmdt(ipc_ptr);
387  return key;
388 #endif
389 }
390 
391 } // namespace arrow
392 
397  auto timer = DEBUG_TIMER(__func__);
398  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
399 
400  struct BuildResultParams {
401  int64_t schemaSize() const {
402  return serialized_schema ? serialized_schema->size() : 0;
403  };
404  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
405  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
406  bool hasRecordBatch() const { return records_size > 0; }
407  bool hasDict() const { return dictSize() > 0; }
408 
409  int64_t records_size{0};
410  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
411  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
412  } result_params;
413 
416  const auto getWireResult = [&]() -> ArrowResult {
417  auto timer = DEBUG_TIMER("serialize batch to wire");
418  const auto total_size = result_params.totalSize();
419  std::vector<char> record_handle_data(total_size);
420  auto serialized_records =
421  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
422 
423  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
424 
425  ARROW_THROW_NOT_OK(writer->Write(
426  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
427  result_params.schemaSize()));
428 
429  if (result_params.hasDict()) {
430  ARROW_THROW_NOT_OK(writer->Write(
431  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
432  result_params.dictSize()));
433  }
434 
435  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
436  serialized_records, result_params.schemaSize() + result_params.dictSize()));
437 
438  if (result_params.hasRecordBatch()) {
439  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
440  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
441  }
442 
443  return {std::vector<char>(0),
444  0,
445  std::vector<char>(0),
446  serialized_records->size(),
447  std::string{""},
448  std::move(record_handle_data)};
449  };
450 
451  const auto getShmResult = [&]() -> ArrowResult {
452  auto timer = DEBUG_TIMER("serialize batch to shared memory");
453  std::shared_ptr<arrow::Buffer> serialized_records;
454  std::vector<char> schema_handle_buffer;
455  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
456  key_t records_shm_key = IPC_PRIVATE;
457  const int64_t total_size = result_params.totalSize();
458 
459  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
460 
461  memcpy(serialized_records->mutable_data(),
462  result_params.serialized_schema->data(),
463  (size_t)result_params.schemaSize());
464 
465  if (result_params.hasDict()) {
466  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
467  result_params.serialized_dict->data(),
468  (size_t)result_params.dictSize());
469  }
470 
471  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
472  serialized_records, result_params.schemaSize() + result_params.dictSize()));
473 
474  if (result_params.hasRecordBatch()) {
475  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
476  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
477  }
478 
479  memcpy(&record_handle_buffer[0],
480  reinterpret_cast<const unsigned char*>(&records_shm_key),
481  sizeof(key_t));
482 
483  return {schema_handle_buffer,
484  0,
485  record_handle_buffer,
486  serialized_records->size(),
487  std::string{""}};
488  };
489 
490  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
491  auto options = arrow::ipc::IpcWriteOptions::Defaults();
492  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
493 
494  // If our record batch is going to be empty, we omit it entirely,
495  // only serializing the schema.
496  if (!record_batch->num_rows()) {
497  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
498  arrow::ipc::SerializeSchema(*record_batch->schema(),
499  arrow::default_memory_pool()));
500 
501  switch (transport_method_) {
503  return getWireResult();
505  return getShmResult();
506  default:
507  UNREACHABLE();
508  }
509  }
510 
511  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
512 
513  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
514 
515  for (auto& pair : dictionaries) {
516  arrow::ipc::IpcPayload payload;
517  int64_t dictionary_id = pair.first;
518  const auto& dictionary = pair.second;
519 
521  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
522  int32_t metadata_length = 0;
524  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
525  }
526  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
527 
528  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
529  arrow::ipc::SerializeSchema(*record_batch->schema(),
530  arrow::default_memory_pool()));
531 
533  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
534 
535  switch (transport_method_) {
537  return getWireResult();
539  return getShmResult();
540  default:
541  UNREACHABLE();
542  }
543  }
544 #ifdef HAVE_CUDA
546 
547  // Copy the schema to the schema handle
548  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
549  ARROW_THROW_NOT_OK(out_stream_result.status());
550  auto out_stream = std::move(out_stream_result).ValueOrDie();
551 
552  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
553  arrow::ipc::DictionaryMemo current_memo;
554  arrow::ipc::DictionaryMemo serialized_memo;
555 
556  arrow::ipc::IpcPayload schema_payload;
557  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
558  arrow::ipc::IpcWriteOptions::Defaults(),
559  mapper,
560  &schema_payload));
561  int32_t schema_payload_length = 0;
562  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
563  arrow::ipc::IpcWriteOptions::Defaults(),
564  out_stream.get(),
565  &schema_payload_length));
566  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
567  ARROW_LOG("GPU") << "Dictionary "
568  << "found dicts: " << dictionaries.size();
569 
571  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
572 
573  // now try a dictionary
574  std::shared_ptr<arrow::Schema> dummy_schema;
575  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
576 
577  for (const auto& pair : dictionaries) {
578  arrow::ipc::IpcPayload payload;
579  const auto& dict_id = pair.first;
580  CHECK_GE(dict_id, 0);
581  ARROW_LOG("GPU") << "Dictionary "
582  << "dict_id: " << dict_id;
583  const auto& dict = pair.second;
584  CHECK(dict);
585 
586  if (!dummy_schema) {
587  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
588  dummy_schema = std::make_shared<arrow::Schema>(
589  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
590  }
591  dict_batches.emplace_back(
592  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
593  }
594 
595  if (!dict_batches.empty()) {
596  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
597  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
598  }
599 
600  auto complete_ipc_stream = out_stream->Finish();
601  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
602  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
603 
604  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
605  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
606  memcpy(&schema_record_key_buffer[0],
607  reinterpret_cast<const unsigned char*>(&record_key),
608  sizeof(key_t));
609 
610  arrow::cuda::CudaDeviceManager* manager;
611  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
612  std::shared_ptr<arrow::cuda::CudaContext> context;
613  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
614 
615  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
616  ARROW_ASSIGN_OR_THROW(device_serialized,
617  SerializeRecordBatch(*record_batch, context.get()));
618 
619  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
620  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
621 
622  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
623  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
624  cuda_handle->Serialize(arrow::default_memory_pool()));
625 
626  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
627  memcpy(&record_handle_buffer[0],
628  serialized_cuda_handle->data(),
629  serialized_cuda_handle->size());
630 
631  return {schema_record_key_buffer,
632  serialized_records->size(),
633  record_handle_buffer,
634  serialized_cuda_handle->size(),
635  serialized_cuda_handle->ToString()};
636 #else
637  UNREACHABLE();
638  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
639 #endif
640 }
641 
644  arrow::ipc::DictionaryFieldMapper* mapper) const {
645  auto timer = DEBUG_TIMER(__func__);
646  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
647  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
648 
650  serialized_schema,
651  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
652 
653  if (arrow_copy->num_rows()) {
654  auto timer = DEBUG_TIMER("serialize records");
655  ARROW_THROW_NOT_OK(arrow_copy->Validate());
656  ARROW_ASSIGN_OR_THROW(serialized_records,
657  arrow::ipc::SerializeRecordBatch(
658  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
659  } else {
660  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
661  }
662  return {serialized_schema, serialized_records};
663 }
664 
665 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
666  auto timer = DEBUG_TIMER(__func__);
667  const auto col_count = results_->colCount();
668  std::vector<std::shared_ptr<arrow::Field>> fields;
669  CHECK(col_names_.empty() || col_names_.size() == col_count);
670  for (size_t i = 0; i < col_count; ++i) {
671  const auto ti = results_->getColType(i);
672  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
673  }
674 #if ARROW_CONVERTER_DEBUG
675  VLOG(1) << "Arrow fields: ";
676  for (const auto& f : fields) {
677  VLOG(1) << "\t" << f->ToString(true);
678  }
679 #endif
680  return getArrowBatch(arrow::schema(fields));
681 }
682 
683 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
684  const std::shared_ptr<arrow::Schema>& schema) const {
685  std::vector<std::shared_ptr<arrow::Array>> result_columns;
686 
687  // First, check if the result set is empty.
688  // If so, we return an arrow result set that only
689  // contains the schema (no record batch will be serialized).
690  if (results_->isEmpty()) {
691  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
692  }
693 
694  const size_t entry_count = top_n_ < 0
695  ? results_->entryCount()
696  : std::min(size_t(top_n_), results_->entryCount());
697 
698  const auto col_count = results_->colCount();
699  size_t row_count = 0;
700 
701  result_columns.resize(col_count);
702  std::vector<ColumnBuilder> builders(col_count);
703 
704  // Create array builders
705  for (size_t i = 0; i < col_count; ++i) {
706  initializeColumnBuilder(builders[i], results_->getColType(i), i, schema->field(i));
707  }
708 
709  // TODO(miyu): speed up for columnar buffers
710  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
711  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
712  const std::vector<bool>& non_lazy_cols,
713  const size_t start_entry,
714  const size_t end_entry) -> size_t {
715  CHECK_EQ(value_seg.size(), col_count);
716  CHECK_EQ(null_bitmap_seg.size(), col_count);
717  const auto local_entry_count = end_entry - start_entry;
718  size_t seg_row_count = 0;
719  for (size_t i = start_entry; i < end_entry; ++i) {
720  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
721  if (row.empty()) {
722  continue;
723  }
724  ++seg_row_count;
725  for (size_t j = 0; j < col_count; ++j) {
726  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
727  continue;
728  }
729 
730  if (auto scalar_value = boost::get<ScalarTargetValue>(&row[j])) {
731  // TODO(miyu): support more types other than scalar.
732  CHECK(scalar_value);
733  const auto& column = builders[j];
734  switch (column.physical_type) {
735  case kBOOLEAN:
736  create_or_append_value<bool, int64_t>(
737  *scalar_value, value_seg[j], local_entry_count);
738  create_or_append_validity<int64_t>(
739  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
740  break;
741  case kTINYINT:
742  create_or_append_value<int8_t, int64_t>(
743  *scalar_value, value_seg[j], local_entry_count);
744  create_or_append_validity<int64_t>(
745  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
746  break;
747  case kSMALLINT:
748  create_or_append_value<int16_t, int64_t>(
749  *scalar_value, value_seg[j], local_entry_count);
750  create_or_append_validity<int64_t>(
751  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
752  break;
753  case kINT:
754  create_or_append_value<int32_t, int64_t>(
755  *scalar_value, value_seg[j], local_entry_count);
756  create_or_append_validity<int64_t>(
757  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
758  break;
759  case kBIGINT:
760  create_or_append_value<int64_t, int64_t>(
761  *scalar_value, value_seg[j], local_entry_count);
762  create_or_append_validity<int64_t>(
763  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
764  break;
765  case kDECIMAL:
766  create_or_append_value<int64_t, int64_t>(
767  *scalar_value, value_seg[j], local_entry_count);
768  create_or_append_validity<int64_t>(
769  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
770  break;
771  case kFLOAT:
772  create_or_append_value<float, float>(
773  *scalar_value, value_seg[j], local_entry_count);
774  create_or_append_validity<float>(
775  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
776  break;
777  case kDOUBLE:
778  create_or_append_value<double, double>(
779  *scalar_value, value_seg[j], local_entry_count);
780  create_or_append_validity<double>(
781  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
782  break;
783  case kTIME:
784  create_or_append_value<int32_t, int64_t>(
785  *scalar_value, value_seg[j], local_entry_count);
786  create_or_append_validity<int64_t>(
787  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
788  break;
789  case kDATE:
791  ? create_or_append_value<int64_t, int64_t>(
792  *scalar_value, value_seg[j], local_entry_count)
793  : create_or_append_value<int32_t, int64_t>(
794  *scalar_value, value_seg[j], local_entry_count);
795  create_or_append_validity<int64_t>(
796  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
797  break;
798  case kTIMESTAMP:
799  create_or_append_value<int64_t, int64_t>(
800  *scalar_value, value_seg[j], local_entry_count);
801  create_or_append_validity<int64_t>(
802  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
803  break;
804  case kTEXT:
805  create_or_append_value<std::string, NullableString>(
806  *scalar_value, value_seg[j], local_entry_count);
807  create_or_append_validity<NullableString>(
808  *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
809  break;
810  default:
811  // TODO(miyu): support more scalar types.
812  throw std::runtime_error(column.col_type.get_type_name() +
813  " is not supported in Arrow result sets.");
814  }
815  } else if (auto array = boost::get<ArrayTargetValue>(&row[j])) {
816  // array := Boost::optional<std::vector<ScalarTargetValue>>
817  const auto& column = builders[j];
818  switch (column.col_type.get_subtype()) {
819  case kBOOLEAN:
820  create_or_append_value<int8_t, int64_t>(
821  *array, value_seg[j], local_entry_count);
823  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
824  break;
825  case kTINYINT:
826  create_or_append_value<int8_t, int64_t>(
827  *array, value_seg[j], local_entry_count);
829  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
830  break;
831  case kSMALLINT:
832  create_or_append_value<int16_t, int64_t>(
833  *array, value_seg[j], local_entry_count);
835  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
836  break;
837  case kINT:
838  create_or_append_value<int32_t, int64_t>(
839  *array, value_seg[j], local_entry_count);
841  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
842  break;
843  case kBIGINT:
844  create_or_append_value<int64_t, int64_t>(
845  *array, value_seg[j], local_entry_count);
847  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
848  break;
849  case kFLOAT:
850  create_or_append_value<float, float>(
851  *array, value_seg[j], local_entry_count);
853  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
854  break;
855  case kDOUBLE:
856  create_or_append_value<double, double>(
857  *array, value_seg[j], local_entry_count);
859  *array, column.col_type, null_bitmap_seg[j], local_entry_count);
860  break;
861  default:
862  throw std::runtime_error(column.col_type.get_type_name() +
863  " is not supported in Arrow result sets.");
864  }
865  }
866  }
867  }
868  return seg_row_count;
869  };
870 
871  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
872  const std::vector<bool>& non_lazy_cols,
873  const size_t start_col,
874  const size_t end_col) {
875  for (size_t col = start_col; col < end_col; ++col) {
876  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
877  continue;
878  }
879 
880  const auto& column = builders[col];
881  switch (column.physical_type) {
882  case kTINYINT:
883  convert_column<int8_t>(results_, col, entry_count, result[col]);
884  break;
885  case kSMALLINT:
886  convert_column<int16_t>(results_, col, entry_count, result[col]);
887  break;
888  case kINT:
889  convert_column<int32_t>(results_, col, entry_count, result[col]);
890  break;
891  case kBIGINT:
892  convert_column<int64_t>(results_, col, entry_count, result[col]);
893  break;
894  case kFLOAT:
895  convert_column<float>(results_, col, entry_count, result[col]);
896  break;
897  case kDOUBLE:
898  convert_column<double>(results_, col, entry_count, result[col]);
899  break;
900  default:
901  throw std::runtime_error(column.col_type.get_type_name() +
902  " is not supported in Arrow column converter.");
903  }
904  }
905  };
906 
907  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
908  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
909  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
910  // Don't believe we ever output directly from a table function, but this
911  // might be possible with a future query plan optimization
912  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
913  (results_->getQueryMemDesc().getQueryDescriptionType() ==
915  results_->getQueryMemDesc().getQueryDescriptionType() ==
917  entry_count == results_->entryCount();
918  std::vector<bool> non_lazy_cols;
919  if (use_columnar_converter) {
920  auto timer = DEBUG_TIMER("columnar converter");
921  std::vector<size_t> non_lazy_col_pos;
922  size_t non_lazy_col_count = 0;
923  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
924 
925  non_lazy_cols.reserve(col_count);
926  non_lazy_col_pos.reserve(col_count);
927  for (size_t i = 0; i < col_count; ++i) {
928  bool is_lazy =
929  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
930  // Currently column converter cannot handle some data types.
931  // Treat them as lazy.
932  switch (builders[i].physical_type) {
933  case kBOOLEAN:
934  case kTIME:
935  case kDATE:
936  case kTIMESTAMP:
937  is_lazy = true;
938  break;
939  default:
940  break;
941  }
942  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
943  is_lazy = true;
944  }
945  non_lazy_cols.emplace_back(!is_lazy);
946  if (!is_lazy) {
947  ++non_lazy_col_count;
948  non_lazy_col_pos.emplace_back(i);
949  }
950  }
951 
952  if (non_lazy_col_count == col_count) {
953  non_lazy_cols.clear();
954  non_lazy_col_pos.clear();
955  } else {
956  non_lazy_col_pos.emplace_back(col_count);
957  }
958 
959  std::vector<std::future<void>> child_threads;
960  size_t num_threads =
961  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
962 
963  size_t start_col = 0;
964  size_t end_col = 0;
965  for (size_t i = 0; i < num_threads; ++i) {
966  start_col = end_col;
967  end_col = (i + 1) * non_lazy_col_count / num_threads;
968  size_t phys_start_col =
969  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
970  size_t phys_end_col =
971  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
972  child_threads.push_back(std::async(std::launch::async,
973  convert_columns,
974  std::ref(result_columns),
975  non_lazy_cols,
976  phys_start_col,
977  phys_end_col));
978  }
979  for (auto& child : child_threads) {
980  child.get();
981  }
982  row_count = entry_count;
983  }
984  if (!use_columnar_converter || !non_lazy_cols.empty()) {
985  auto timer = DEBUG_TIMER("row converter");
986  row_count = 0;
987  if (multithreaded) {
988  const size_t cpu_count = cpu_threads();
989  std::vector<std::future<size_t>> child_threads;
990  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
991  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
992  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
993  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
994  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
995  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
996  ++i, start_entry += stride) {
997  const auto end_entry = std::min(entry_count, start_entry + stride);
998  child_threads.push_back(std::async(std::launch::async,
999  fetch,
1000  std::ref(column_value_segs[i]),
1001  std::ref(null_bitmap_segs[i]),
1002  non_lazy_cols,
1003  start_entry,
1004  end_entry));
1005  }
1006  for (auto& child : child_threads) {
1007  row_count += child.get();
1008  }
1009  {
1010  auto timer = DEBUG_TIMER("append rows to arrow");
1011  for (int i = 0; i < schema->num_fields(); ++i) {
1012  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1013  continue;
1014  }
1015 
1016  for (size_t j = 0; j < cpu_count; ++j) {
1017  if (!column_value_segs[j][i]) {
1018  continue;
1019  }
1020  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
1021  }
1022  }
1023  }
1024  } else {
1025  row_count =
1026  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
1027  {
1028  auto timer = DEBUG_TIMER("append rows to arrow single thread");
1029  for (int i = 0; i < schema->num_fields(); ++i) {
1030  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1031  continue;
1032  }
1033 
1034  append(builders[i], *column_values[i], null_bitmaps[i]);
1035  }
1036  }
1037  }
1038 
1039  {
1040  auto timer = DEBUG_TIMER("finish builders");
1041  for (size_t i = 0; i < col_count; ++i) {
1042  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1043  continue;
1044  }
1045 
1046  result_columns[i] = finishColumnBuilder(builders[i]);
1047  }
1048  }
1049  }
1050 
1051  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
1052 }
1053 
1054 namespace {
1055 
1056 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
1057  const ExecutorDeviceType device_type) {
1058  switch (get_physical_type(sql_type)) {
1059  case kBOOLEAN:
1060  return arrow::boolean();
1061  case kTINYINT:
1062  return arrow::int8();
1063  case kSMALLINT:
1064  return arrow::int16();
1065  case kINT:
1066  return arrow::int32();
1067  case kBIGINT:
1068  return arrow::int64();
1069  case kFLOAT:
1070  return arrow::float32();
1071  case kDOUBLE:
1072  return arrow::float64();
1073  case kCHAR:
1074  case kVARCHAR:
1075  case kTEXT:
1076  if (sql_type.is_dict_encoded_string()) {
1077  auto value_type = std::make_shared<arrow::StringType>();
1078  return dictionary(arrow::int32(), value_type, false);
1079  }
1080  return arrow::utf8();
1081  case kDECIMAL:
1082  case kNUMERIC:
1083  return arrow::decimal(sql_type.get_precision(), sql_type.get_scale());
1084  case kTIME:
1085  return time32(arrow::TimeUnit::SECOND);
1086  case kDATE: {
1087  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
1088  // Currently support for date32() is missing in cuDF.Hence, if client requests for
1089  // date on GPU, return date64() for the time being, till support is added.
1090  if (device_type == ExecutorDeviceType::GPU) {
1091  return arrow::date64();
1092  } else {
1093  return arrow::date32();
1094  }
1095  }
1096  case kTIMESTAMP:
1097  switch (sql_type.get_precision()) {
1098  case 0:
1099  return timestamp(arrow::TimeUnit::SECOND);
1100  case 3:
1101  return timestamp(arrow::TimeUnit::MILLI);
1102  case 6:
1103  return timestamp(arrow::TimeUnit::MICRO);
1104  case 9:
1105  return timestamp(arrow::TimeUnit::NANO);
1106  default:
1107  throw std::runtime_error(
1108  "Unsupported timestamp precision for Arrow result sets: " +
1109  std::to_string(sql_type.get_precision()));
1110  }
1111  case kARRAY:
1112  switch (sql_type.get_subtype()) {
1113  case kBOOLEAN:
1114  return arrow::list(arrow::boolean());
1115  case kTINYINT:
1116  return arrow::list(arrow::int8());
1117  case kSMALLINT:
1118  return arrow::list(arrow::int16());
1119  case kINT:
1120  return arrow::list(arrow::int32());
1121  case kBIGINT:
1122  return arrow::list(arrow::int64());
1123  case kFLOAT:
1124  return arrow::list(arrow::float32());
1125  case kDOUBLE:
1126  return arrow::list(arrow::float64());
1127  default:
1128  throw std::runtime_error("Unsupported array type for Arrow result sets: " +
1129  sql_type.get_type_name());
1130  }
1131  case kINTERVAL_DAY_TIME:
1132  case kINTERVAL_YEAR_MONTH:
1133  default:
1134  throw std::runtime_error(sql_type.get_type_name() +
1135  " is not supported in Arrow result sets.");
1136  }
1137  return nullptr;
1138 }
1139 
1140 } // namespace
1141 
1142 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
1143  const std::string name,
1144  const SQLTypeInfo& target_type) const {
1145  return arrow::field(
1146  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1147 }
1148 
1150  const ArrowResult& result,
1151  const ExecutorDeviceType device_type,
1152  const size_t device_id,
1153  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1154 #ifndef _MSC_VER
1155  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
1156  // Remove shared memory on sysmem
1157  if (!result.sm_handle.empty()) {
1158  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
1159  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
1160  auto shm_id = shmget(schema_key, result.sm_size, 0666);
1161  if (shm_id < 0) {
1162  throw std::runtime_error(
1163  "failed to get an valid shm ID w/ given shm key of the schema");
1164  }
1165  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1166  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
1167  std::to_string(errno) + ")");
1168  }
1169  }
1170 
1171  if (device_type == ExecutorDeviceType::CPU) {
1172  CHECK_EQ(sizeof(key_t), result.df_handle.size());
1173  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
1174  auto shm_id = shmget(df_key, result.df_size, 0666);
1175  if (shm_id < 0) {
1176  throw std::runtime_error(
1177  "failed to get an valid shm ID w/ given shm key of the data");
1178  }
1179  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1180  throw std::runtime_error("failed to deallocate Arrow data frame");
1181  }
1182  }
1183  // CUDA buffers become owned by the caller, and will automatically be freed
1184  // TODO: What if the client never takes ownership of the result? we may want to
1185  // establish a check to see if the GPU buffer still exists, and then free it.
1186 #endif
1187 }
1188 
1190  ColumnBuilder& column_builder,
1191  const SQLTypeInfo& col_type,
1192  const size_t results_col_slot_idx,
1193  const std::shared_ptr<arrow::Field>& field) const {
1194  column_builder.field = field;
1195  column_builder.col_type = col_type;
1196  column_builder.physical_type = col_type.is_dict_encoded_string()
1197  ? get_dict_index_type(col_type)
1198  : get_physical_type(col_type);
1199 
1200  auto value_type = field->type();
1201  if (col_type.is_dict_encoded_string()) {
1202  auto timer = DEBUG_TIMER("Translate string dictionary to Arrow dictionary");
1203  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1204  // add values to the builder
1205  const int dict_id = col_type.get_comp_param();
1206 
1207  // ResultSet::rowCount(), unlike ResultSet::entryCount(), will return
1208  // the actual number of rows in the result set, taking into account
1209  // things like any limit and offset set
1210  const size_t result_set_rows = results_->rowCount();
1211  // result_set_rows guaranteed > 0 by parent
1212  CHECK_GT(result_set_rows, 0UL);
1213 
1214  auto sdp = results_->getStringDictionaryProxy(dict_id);
1215  const size_t dictionary_proxy_entries = sdp->entryCount();
1216  const double dictionary_to_result_size_ratio =
1217  static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1218 
1219  // We are conservative with when we do a bulk dictionary fetch,
1220  // even though it is generally more efficient than dictionary unique value "plucking",
1221  // for the following reasons:
1222  // 1) The number of actual distinct dictionary values can be much lower than the
1223  // number of result rows, but without getting the expression range (and that would
1224  // only work in some cases), we don't know by how much
1225  // 2) Regardless of the effect of #1, the size of the dictionary generated via
1226  // the "pluck" method will always be at worst equal in size, and very likely
1227  // significantly smaller, than the dictionary created by the bulk dictionary
1228  // fetch method, and smaller Arrow dictionaries are always a win when it comes to
1229  // sending the Arrow results over the wire, and for lowering the processing load
1230  // for clients (which often is a web browser with a lot less compute and memory
1231  // resources than our server.)
1232 
1233  const bool do_dictionary_bulk_fetch =
1234  result_set_rows > min_result_size_for_bulk_dictionary_fetch_ &&
1235  dictionary_to_result_size_ratio <=
1237 
1238  arrow::StringBuilder str_array_builder;
1239 
1240  if (do_dictionary_bulk_fetch) {
1241  VLOG(1) << "Arrow dictionary creation: bulk copying all dictionary "
1242  << " entries for column at offset " << results_col_slot_idx << ". "
1243  << "Column has " << dictionary_proxy_entries << " string entries"
1244  << " for a result set with " << result_set_rows << " rows.";
1245  column_builder.string_remap_mode =
1247  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1248  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(str_list));
1249 
1250  // When we fetch the bulk dictionary, we need to also fetch
1251  // the transient entries only contained in the proxy.
1252  // These values are always negative (starting at -2), and so need
1253  // to be remapped to point to the corresponding entries in the Arrow
1254  // dictionary (they are placed at the end after the materialized
1255  // string entries from StringDictionary)
1256 
1257  int32_t crt_transient_id = static_cast<int32_t>(str_list.size());
1258  auto const& transient_vecmap = sdp->getTransientVector();
1259  for (unsigned index = 0; index < transient_vecmap.size(); ++index) {
1260  ARROW_THROW_NOT_OK(str_array_builder.Append(*transient_vecmap[index]));
1261  auto const old_id = StringDictionaryProxy::transientIndexToId(index);
1262  CHECK(column_builder.string_remapping
1263  .insert(std::make_pair(old_id, crt_transient_id++))
1264  .second);
1265  }
1266  } else {
1267  // Pluck unique dictionary values from ResultSet column
1268  VLOG(1) << "Arrow dictionary creation: serializing unique result set dictionary "
1269  << " entries for column at offset " << results_col_slot_idx << ". "
1270  << "Column has " << dictionary_proxy_entries << " string entries"
1271  << " for a result set with " << result_set_rows << " rows.";
1273 
1274  // ResultSet::getUniqueStringsForDictEncodedTargetCol returns a pair of two vectors,
1275  // the first of int32_t values containing the unique string ids found for
1276  // results_col_slot_idx in the result set, the second containing the associated
1277  // unique strings. Note that the unique string for a unique string id are both
1278  // placed at the same offset in their respective vectors
1279 
1280  auto unique_ids_and_strings =
1281  results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1282  const auto& unique_ids = unique_ids_and_strings.first;
1283  const auto& unique_strings = unique_ids_and_strings.second;
1284  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(unique_strings));
1285  const int32_t num_unique_strings = unique_strings.size();
1286  CHECK_EQ(num_unique_strings, unique_ids.size());
1287  // We need to remap ALL string id values given the Arrow dictionary
1288  // will have "holes", i.e. it is a sparse representation of the underlying
1289  // StringDictionary
1290  for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1291  ++unique_string_idx) {
1292  CHECK(
1293  column_builder.string_remapping
1294  .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1295  .second);
1296  }
1297  // Note we don't need to get transients from proxy as they are already handled in
1298  // ResultSet::getUniqueStringsForDictEncodedTargetCol
1299  }
1300 
1301  std::shared_ptr<arrow::StringArray> string_array;
1302  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1303 
1304  auto dict_builder =
1305  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1306  CHECK(dict_builder);
1307 
1308  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1309  } else {
1310  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1311  arrow::default_memory_pool(), value_type, &column_builder.builder));
1312  }
1313 }
1314 
1315 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1316  ColumnBuilder& column_builder) const {
1317  std::shared_ptr<arrow::Array> values;
1318  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1319  return values;
1320 }
1321 
1322 namespace {
1323 
1324 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1326  const ValueArray& values,
1327  const std::shared_ptr<std::vector<bool>>& is_valid) {
1328  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1329  "Dictionary encoded string builder requires function specialization.");
1330 
1331  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1332 
1333  if (scale_epoch_values<BUILDER_TYPE>()) {
1334  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1335  auto scale_values = [&](auto epoch) {
1336  return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1338  : scale_sec_to_millisec(epoch);
1339  };
1340  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1341  }
1342 
1343  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1344  CHECK(typed_builder);
1345  if (column_builder.field->nullable()) {
1346  CHECK(is_valid.get());
1347  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1348  } else {
1349  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1350  }
1351 }
1352 
1353 template <>
1354 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1356  const ValueArray& values,
1357  const std::shared_ptr<std::vector<bool>>& is_valid) {
1358  std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1359  auto typed_builder =
1360  dynamic_cast<arrow::Decimal128Builder*>(column_builder.builder.get());
1361  CHECK(typed_builder);
1362  CHECK_EQ(is_valid->size(), vals.size());
1363  if (column_builder.field->nullable()) {
1364  CHECK(is_valid.get());
1365  for (size_t i = 0; i < vals.size(); i++) {
1366  const auto v = vals[i];
1367  const auto valid = (*is_valid)[i];
1368  if (valid) {
1369  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1370  } else {
1371  ARROW_THROW_NOT_OK(typed_builder->AppendNull());
1372  }
1373  }
1374  } else {
1375  for (const auto& v : vals) {
1376  ARROW_THROW_NOT_OK(typed_builder->Append(v));
1377  }
1378  }
1379 }
1380 
1381 template <>
1382 void appendToColumnBuilder<arrow::StringBuilder, std::string>(
1384  const ValueArray& values,
1385  const std::shared_ptr<std::vector<bool>>& is_valid) {
1386  std::vector<std::string> vals = boost::get<std::vector<std::string>>(values);
1387  auto typed_builder = dynamic_cast<arrow::StringBuilder*>(column_builder.builder.get());
1388  CHECK(typed_builder);
1389  CHECK_EQ(is_valid->size(), vals.size());
1390 
1391  if (column_builder.field->nullable()) {
1392  CHECK(is_valid.get());
1393 
1394  // TODO: Generate this instead of the boolean bitmap
1395  std::vector<uint8_t> transformed_bitmap;
1396  transformed_bitmap.reserve(is_valid->size());
1397  std::for_each(
1398  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1399  transformed_bitmap.push_back(is_valid ? 1 : 0);
1400  });
1401  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, transformed_bitmap.data()));
1402  } else {
1403  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1404  }
1405 }
1406 
1407 template <>
1408 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1410  const ValueArray& values,
1411  const std::shared_ptr<std::vector<bool>>& is_valid) {
1412  auto typed_builder =
1413  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1414  CHECK(typed_builder);
1415 
1416  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1417  // remap negative values if ArrowStringRemapMode == ONLY_TRANSIENT_STRINGS_REMAPPED or
1418  // everything if ALL_STRINGS_REMAPPED
1419  CHECK(column_builder.string_remap_mode != ArrowStringRemapMode::INVALID);
1420  for (size_t i = 0; i < vals.size(); i++) {
1421  auto& val = vals[i];
1422  if ((column_builder.string_remap_mode == ArrowStringRemapMode::ALL_STRINGS_REMAPPED ||
1423  val < 0) &&
1424  (*is_valid)[i]) {
1425  vals[i] = column_builder.string_remapping.at(val);
1426  }
1427  }
1428 
1429  if (column_builder.field->nullable()) {
1430  CHECK(is_valid.get());
1431  // TODO(adb): Generate this instead of the boolean bitmap
1432  std::vector<uint8_t> transformed_bitmap;
1433  transformed_bitmap.reserve(is_valid->size());
1434  std::for_each(
1435  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1436  transformed_bitmap.push_back(is_valid ? 1 : 0);
1437  });
1438 
1439  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1440  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1441  } else {
1443  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1444  }
1445 }
1446 
1447 template <typename BUILDER_TYPE, typename VALUE_TYPE>
1449  const ValueArray& values,
1450  const std::shared_ptr<std::vector<bool>>& is_valid) {
1451  Vec2<VALUE_TYPE> vals = boost::get<Vec2<VALUE_TYPE>>(values);
1452  auto list_builder = dynamic_cast<arrow::ListBuilder*>(column_builder.builder.get());
1453  CHECK(list_builder);
1454 
1455  auto value_builder = static_cast<BUILDER_TYPE*>(list_builder->value_builder());
1456 
1457  if (column_builder.field->nullable()) {
1458  for (size_t i = 0; i < vals.size(); i++) {
1459  if ((*is_valid)[i]) {
1460  const auto& val = vals[i];
1461  std::vector<uint8_t> bitmap(val.size());
1462  std::transform(val.begin(), val.end(), bitmap.begin(), [](VALUE_TYPE pvalue) {
1463  return static_cast<VALUE_TYPE>(pvalue) != null_type<VALUE_TYPE>::value;
1464  });
1465  ARROW_THROW_NOT_OK(list_builder->Append());
1466  if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1467  std::vector<uint8_t> bval(val.size());
1468  std::copy(val.begin(), val.end(), bval.begin());
1470  value_builder->AppendValues(bval.data(), bval.size(), bitmap.data()));
1471  } else {
1473  value_builder->AppendValues(val.data(), val.size(), bitmap.data()));
1474  }
1475  } else {
1476  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1477  }
1478  }
1479  } else {
1480  for (size_t i = 0; i < vals.size(); i++) {
1481  if ((*is_valid)[i]) {
1482  const auto& val = vals[i];
1483  ARROW_THROW_NOT_OK(list_builder->Append());
1484  if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1485  std::vector<uint8_t> bval(val.size());
1486  std::copy(val.begin(), val.end(), bval.begin());
1487  ARROW_THROW_NOT_OK(value_builder->AppendValues(bval.data(), bval.size()));
1488  } else {
1489  ARROW_THROW_NOT_OK(value_builder->AppendValues(val.data(), val.size()));
1490  }
1491  } else {
1492  ARROW_THROW_NOT_OK(list_builder->AppendNull());
1493  }
1494  }
1495  }
1496 }
1497 
1498 } // namespace
1499 
1501  ColumnBuilder& column_builder,
1502  const ValueArray& values,
1503  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1504  if (column_builder.col_type.is_dict_encoded_string()) {
1505  CHECK_EQ(column_builder.physical_type,
1506  kINT); // assume all dicts use none-encoded type for now
1507  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1508  column_builder, values, is_valid);
1509  return;
1510  }
1511  switch (column_builder.physical_type) {
1512  case kBOOLEAN:
1513  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1514  column_builder, values, is_valid);
1515  break;
1516  case kTINYINT:
1517  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1518  break;
1519  case kSMALLINT:
1520  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1521  column_builder, values, is_valid);
1522  break;
1523  case kINT:
1524  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1525  column_builder, values, is_valid);
1526  break;
1527  case kBIGINT:
1528  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1529  column_builder, values, is_valid);
1530  break;
1531  case kDECIMAL:
1532  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1533  column_builder, values, is_valid);
1534  break;
1535  case kFLOAT:
1536  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1537  break;
1538  case kDOUBLE:
1539  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1540  column_builder, values, is_valid);
1541  break;
1542  case kTIME:
1543  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1544  column_builder, values, is_valid);
1545  break;
1546  case kTIMESTAMP:
1547  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1548  column_builder, values, is_valid);
1549  break;
1550  case kDATE:
1552  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1553  column_builder, values, is_valid)
1554  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1555  column_builder, values, is_valid);
1556  break;
1557  case kARRAY:
1558  if (column_builder.col_type.get_subtype() == kBOOLEAN) {
1559  appendToListColumnBuilder<arrow::BooleanBuilder, int8_t>(
1560  column_builder, values, is_valid);
1561  break;
1562  } else if (column_builder.col_type.get_subtype() == kTINYINT) {
1563  appendToListColumnBuilder<arrow::Int8Builder, int8_t>(
1564  column_builder, values, is_valid);
1565  break;
1566  } else if (column_builder.col_type.get_subtype() == kSMALLINT) {
1567  appendToListColumnBuilder<arrow::Int16Builder, int16_t>(
1568  column_builder, values, is_valid);
1569  break;
1570  } else if (column_builder.col_type.get_subtype() == kINT) {
1571  appendToListColumnBuilder<arrow::Int32Builder, int32_t>(
1572  column_builder, values, is_valid);
1573  break;
1574  } else if (column_builder.col_type.get_subtype() == kBIGINT) {
1575  appendToListColumnBuilder<arrow::Int64Builder, int64_t>(
1576  column_builder, values, is_valid);
1577  break;
1578  } else if (column_builder.col_type.get_subtype() == kFLOAT) {
1579  appendToListColumnBuilder<arrow::FloatBuilder, float>(
1580  column_builder, values, is_valid);
1581  break;
1582  } else if (column_builder.col_type.get_subtype() == kDOUBLE) {
1583  appendToListColumnBuilder<arrow::DoubleBuilder, double>(
1584  column_builder, values, is_valid);
1585  break;
1586  } else {
1587  throw std::runtime_error(column_builder.col_type.get_type_name() +
1588  " is not supported in Arrow result sets.");
1589  }
1590  case kCHAR:
1591  case kVARCHAR:
1592  case kTEXT:
1593  appendToColumnBuilder<arrow::StringBuilder, std::string>(
1594  column_builder, values, is_valid);
1595  break;
1596  default:
1597  // TODO(miyu): support more scalar types.
1598  throw std::runtime_error(column_builder.col_type.get_type_name() +
1599  " is not supported in Arrow result sets.");
1600  }
1601 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< std::vector< T >> Vec2
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:49
SQLTypes
Definition: sqltypes.h:38
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void appendToListColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
bool is_fp() const
Definition: sqltypes.h:514
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GE(x, y)
Definition: Logger.h:235
std::shared_ptr< arrow::Field > field
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
#define CHECK_GT(x, y)
Definition: Logger.h:234
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:516
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
bool is_integer() const
Definition: sqltypes.h:512
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
ExecutorDeviceType device_type_
boost::optional< std::vector< ScalarTargetValue >> ArrayTargetValue
Definition: TargetValue.h:155
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
static int32_t transientIndexToId(unsigned const index)
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:296
bool is_boolean() const
Definition: sqltypes.h:517
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::vector< int8_t >>, std::vector< std::vector< int16_t >>, std::vector< std::vector< int32_t >>, std::vector< std::vector< int64_t >>, std::vector< std::vector< float >>, std::vector< std::vector< double >>, std::vector< std::string >> ValueArray
int get_precision() const
Definition: sqltypes.h:332
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
std::unordered_map< StrId, ArrowStrId > string_remapping
std::shared_ptr< ResultSet > results_
int64_t sm_size
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:443
#define IS_INTEGER(T)
Definition: sqltypes.h:245
Definition: sqltypes.h:41
void create_or_append_validity(const ArrayTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
bool is_dict_encoded_string() const
Definition: sqltypes.h:548
Definition: sqltypes.h:45
string name
Definition: setup.in.py:72
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
int cpu_threads()
Definition: thread_count.h:24
bool is_decimal() const
Definition: sqltypes.h:513
#define VLOG(n)
Definition: Logger.h:316
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:154
std::shared_ptr< arrow::RecordBatch > convertToArrow() const