OmniSciDB  2b310ab3b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 
21 #include <sys/ipc.h>
22 #include <sys/shm.h>
23 #include <sys/types.h>
24 #include <algorithm>
25 #include <cerrno>
26 #include <cstdio>
27 #include <cstdlib>
28 #include <future>
29 #include <string>
30 
31 #include "arrow/api.h"
32 #include "arrow/io/memory.h"
33 #include "arrow/ipc/api.h"
34 
35 #include "Shared/ArrowUtil.h"
36 
37 #ifdef HAVE_CUDA
38 #include <arrow/gpu/cuda_api.h>
39 #include <cuda.h>
40 #endif // HAVE_CUDA
41 
42 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
43 
44 using namespace arrow;
45 
46 namespace {
47 
50  switch (ti.get_size()) {
51  case 1:
52  return kTINYINT;
53  case 2:
54  return kSMALLINT;
55  case 4:
56  return kINT;
57  case 8:
58  return kBIGINT;
59  default:
60  CHECK(false);
61  }
62  return ti.get_type();
63 }
64 
66  auto logical_type = ti.get_type();
67  if (IS_INTEGER(logical_type)) {
68  switch (ti.get_size()) {
69  case 1:
70  return kTINYINT;
71  case 2:
72  return kSMALLINT;
73  case 4:
74  return kINT;
75  case 8:
76  return kBIGINT;
77  default:
78  CHECK(false);
79  }
80  }
81  return logical_type;
82 }
83 
84 template <typename TYPE, typename VALUE_ARRAY_TYPE>
86  std::shared_ptr<ValueArray>& values,
87  const size_t max_size) {
88  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
89  CHECK(pval_cty);
90  auto val_ty = static_cast<TYPE>(*pval_cty);
91  if (!values) {
92  values = std::make_shared<ValueArray>(std::vector<TYPE>());
93  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
94  }
95  CHECK(values);
96  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
97  CHECK(values_ty);
98  values_ty->push_back(val_ty);
99 }
100 
101 template <typename TYPE>
103  const SQLTypeInfo& col_type,
104  std::shared_ptr<std::vector<bool>>& null_bitmap,
105  const size_t max_size) {
106  if (col_type.get_notnull()) {
107  CHECK(!null_bitmap);
108  return;
109  }
110  auto pvalue = boost::get<TYPE>(&value);
111  CHECK(pvalue);
112  bool is_valid = false;
113  if (col_type.is_boolean()) {
114  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
115  } else if (col_type.is_dict_encoded_string()) {
116  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
117  } else if (col_type.is_integer() || col_type.is_time()) {
118  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
119  } else if (col_type.is_fp()) {
120  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
121  } else {
122  UNREACHABLE();
123  }
124 
125  if (!null_bitmap) {
126  null_bitmap = std::make_shared<std::vector<bool>>();
127  null_bitmap->reserve(max_size);
128  }
129  CHECK(null_bitmap);
130  null_bitmap->push_back(is_valid);
131 }
132 
133 std::pair<key_t, void*> get_shm(size_t shmsz) {
134  if (!shmsz) {
135  return std::make_pair(IPC_PRIVATE, nullptr);
136  }
137  // Generate a new key for a shared memory segment. Keys to shared memory segments
138  // are OS global, so we need to try a new key if we encounter a collision. It seems
139  // incremental keygen would be deterministically worst-case. If we use a hash
140  // (like djb2) + nonce, we could still get collisions if multiple clients specify
141  // the same nonce, so using rand() in lieu of a better approach
142  // TODO(ptaylor): Is this common? Are these assumptions true?
143  auto key = static_cast<key_t>(rand());
144  int shmid = -1;
145  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
146  // exist IPC_EXCL - ensures failure if a segment already exists for this key
147  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
148  // If shmget fails and errno is one of these four values, try a new key.
149  // TODO(ptaylor): is checking for the last three values really necessary? Checking
150  // them by default to be safe. EEXIST - a shared memory segment is already associated
151  // with this key EACCES - a shared memory segment is already associated with this key,
152  // but we don't have permission to access it EINVAL - a shared memory segment is
153  // already associated with this key, but the size is less than shmsz ENOENT -
154  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
155  // was found
156  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
157  throw std::runtime_error("failed to create a shared memory");
158  }
159  key = static_cast<key_t>(rand());
160  }
161  // get a pointer to the shared memory segment
162  auto ipc_ptr = shmat(shmid, NULL, 0);
163  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
164  throw std::runtime_error("failed to attach a shared memory");
165  }
166 
167  return std::make_pair(key, ipc_ptr);
168 }
169 
170 std::pair<key_t, std::shared_ptr<Buffer>> get_shm_buffer(size_t size) {
171  auto [key, ipc_ptr] = get_shm(size);
172  std::shared_ptr<Buffer> buffer(new MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
173  return std::make_pair<key_t, std::shared_ptr<Buffer>>(std::move(key),
174  std::move(buffer));
175 }
176 
177 } // namespace
178 
179 namespace arrow {
180 
181 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
182  auto [key, ipc_ptr] = get_shm(data->size());
183  // copy the arrow records buffer to shared memory
184  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
185  // write directly to the shared memory segment as a sink
186  memcpy(ipc_ptr, data->data(), data->size());
187  // detach from the shared memory segment
188  shmdt(ipc_ptr);
189  return key;
190 }
191 
192 } // namespace arrow
193 
198  auto timer = DEBUG_TIMER(__func__);
199  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
200 
201  if (device_type_ == ExecutorDeviceType::CPU ||
202  transport_method_ == ArrowTransport::WIRE) {
203  const auto getWireResult =
204  [&](const int64_t schema_size,
205  const int64_t dict_size,
206  const int64_t records_size,
207  const std::shared_ptr<Buffer>& serialized_schema,
208  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
209  auto timer = DEBUG_TIMER("serialize batch to wire");
210  std::vector<char> schema_handle_data;
211  std::vector<char> record_handle_data;
212  const int64_t total_size = schema_size + records_size + dict_size;
213  record_handle_data.insert(record_handle_data.end(),
214  serialized_schema->data(),
215  serialized_schema->data() + schema_size);
216 
217  record_handle_data.insert(record_handle_data.end(),
218  serialized_dict->data(),
219  serialized_dict->data() + dict_size);
220 
221  record_handle_data.resize(total_size);
222  auto serialized_records =
223  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
224 
225  io::FixedSizeBufferWriter stream(
226  SliceMutableBuffer(serialized_records, schema_size + dict_size));
227  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
228  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
229 
230  return {std::vector<char>(0),
231  0,
232  std::vector<char>(0),
233  serialized_records->size(),
234  std::string{""},
235  record_handle_data};
236  };
237 
238  const auto getShmResult =
239  [&](const int64_t schema_size,
240  const int64_t dict_size,
241  const int64_t records_size,
242  const std::shared_ptr<Buffer>& serialized_schema,
243  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
244  auto timer = DEBUG_TIMER("serialize batch to shared memory");
245  std::shared_ptr<Buffer> serialized_records;
246  std::vector<char> schema_handle_buffer;
247  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
248  key_t records_shm_key = IPC_PRIVATE;
249  const int64_t total_size = schema_size + records_size + dict_size;
250 
251  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
252 
253  memcpy(serialized_records->mutable_data(),
254  serialized_schema->data(),
255  (size_t)schema_size);
256  memcpy(serialized_records->mutable_data() + schema_size,
257  serialized_dict->data(),
258  (size_t)dict_size);
259 
260  io::FixedSizeBufferWriter stream(
261  SliceMutableBuffer(serialized_records, schema_size + dict_size));
262  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
263  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
264  memcpy(&record_handle_buffer[0],
265  reinterpret_cast<const unsigned char*>(&records_shm_key),
266  sizeof(key_t));
267 
268  return {schema_handle_buffer,
269  0,
270  record_handle_buffer,
271  serialized_records->size(),
272  std::string{""}};
273  };
274 
275  std::shared_ptr<Buffer> serialized_schema;
276  int64_t records_size = 0;
277  int64_t schema_size = 0;
278  ipc::DictionaryMemo memo;
279  auto options = ipc::IpcWriteOptions::Defaults();
280  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
281 
282  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &memo));
283  for (auto& pair : memo.dictionaries()) {
284  ipc::IpcPayload payload;
285  int64_t dictionary_id = pair.first;
286  const auto& dictionary = pair.second;
287 
289  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
290  int32_t metadata_length = 0;
292  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
293  }
294  auto serialized_dict = dict_stream->Finish().ValueOrDie();
295  auto dict_size = serialized_dict->size();
296 
298  serialized_schema,
299  ipc::SerializeSchema(*record_batch->schema(), nullptr, default_memory_pool()));
300  schema_size = serialized_schema->size();
301 
302  ARROW_THROW_NOT_OK(ipc::GetRecordBatchSize(*record_batch, &records_size));
303 
304  switch (transport_method_) {
306  return getWireResult(
307  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
309  return getShmResult(
310  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
311  default:
312  UNREACHABLE();
313  }
314  }
315 #ifdef HAVE_CUDA
316  CHECK(device_type_ == ExecutorDeviceType::GPU);
317 
318  // Copy the schema to the schema handle
319  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
320  ARROW_THROW_NOT_OK(out_stream_result.status());
321  auto out_stream = std::move(out_stream_result).ValueOrDie();
322 
323  arrow::ipc::DictionaryMemo current_memo;
324  arrow::ipc::DictionaryMemo serialized_memo;
325 
326  arrow::ipc::IpcPayload schema_payload;
327  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
328  arrow::ipc::IpcWriteOptions::Defaults(),
329  &serialized_memo,
330  &schema_payload));
331  int32_t schema_payload_length = 0;
332  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
333  arrow::ipc::IpcWriteOptions::Defaults(),
334  out_stream.get(),
335  &schema_payload_length));
336 
337  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &current_memo));
338 
339  // now try a dictionary
340  std::shared_ptr<arrow::Schema> dummy_schema;
341  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
342  for (int i = 0; i < record_batch->schema()->num_fields(); i++) {
343  auto field = record_batch->schema()->field(i);
344  if (field->type()->id() == arrow::Type::DICTIONARY) {
345  int64_t dict_id = -1;
346  ARROW_THROW_NOT_OK(current_memo.GetId(field.get(), &dict_id));
347  CHECK_GE(dict_id, 0);
348  std::shared_ptr<Array> dict;
349  ARROW_THROW_NOT_OK(current_memo.GetDictionary(dict_id, &dict));
350  CHECK(dict);
351 
352  if (!dummy_schema) {
353  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
354  dummy_schema = std::make_shared<arrow::Schema>(
355  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
356  }
357  dict_batches.emplace_back(
358  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
359  }
360  }
361 
362  if (!dict_batches.empty()) {
363  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
364  dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
365  }
366 
367  auto complete_ipc_stream = out_stream->Finish();
368  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
369  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
370 
371  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
372  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
373  memcpy(&schema_record_key_buffer[0],
374  reinterpret_cast<const unsigned char*>(&record_key),
375  sizeof(key_t));
376 
377  arrow::cuda::CudaDeviceManager* manager;
378  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
379  std::shared_ptr<arrow::cuda::CudaContext> context;
380  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
381 
382  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
383  ARROW_ASSIGN_OR_THROW(device_serialized,
384  SerializeRecordBatch(*record_batch, context.get()));
385 
386  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
387  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
388 
389  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
390  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
391  cuda_handle->Serialize(arrow::default_memory_pool()));
392 
393  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
394  memcpy(&record_handle_buffer[0],
395  serialized_cuda_handle->data(),
396  serialized_cuda_handle->size());
397 
398  return {schema_record_key_buffer,
399  serialized_records->size(),
400  record_handle_buffer,
401  serialized_cuda_handle->size(),
402  serialized_cuda_handle->ToString()};
403 #else
404  UNREACHABLE();
405  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
406 #endif
407 }
408 
411  arrow::ipc::DictionaryMemo* memo) const {
412  auto timer = DEBUG_TIMER(__func__);
413  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
414  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
415 
416  ARROW_ASSIGN_OR_THROW(serialized_schema,
417  arrow::ipc::SerializeSchema(
418  *arrow_copy->schema(), memo, arrow::default_memory_pool()));
419 
420  ARROW_THROW_NOT_OK(CollectDictionaries(*arrow_copy, memo));
421 
422  if (arrow_copy->num_rows()) {
423  auto timer = DEBUG_TIMER("serialize records");
424  ARROW_THROW_NOT_OK(arrow_copy->Validate());
425  ARROW_ASSIGN_OR_THROW(serialized_records,
426  arrow::ipc::SerializeRecordBatch(
427  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
428  } else {
429  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
430  }
431  return {serialized_schema, serialized_records};
432 }
433 
434 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
435  const auto col_count = results_->colCount();
436  std::vector<std::shared_ptr<arrow::Field>> fields;
437  CHECK(col_names_.empty() || col_names_.size() == col_count);
438  for (size_t i = 0; i < col_count; ++i) {
439  const auto ti = results_->getColType(i);
440  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
441  }
442  return getArrowBatch(arrow::schema(fields));
443 }
444 
445 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
446  const std::shared_ptr<arrow::Schema>& schema) const {
447  std::vector<std::shared_ptr<arrow::Array>> result_columns;
448 
449  const size_t entry_count = top_n_ < 0
450  ? results_->entryCount()
451  : std::min(size_t(top_n_), results_->entryCount());
452  if (!entry_count) {
453  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
454  }
455  const auto col_count = results_->colCount();
456  size_t row_count = 0;
457 
458  std::vector<ColumnBuilder> builders(col_count);
459 
460  // Create array builders
461  for (size_t i = 0; i < col_count; ++i) {
462  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
463  }
464 
465  // TODO(miyu): speed up for columnar buffers
466  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
467  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
468  const size_t start_entry,
469  const size_t end_entry) -> size_t {
470  CHECK_EQ(value_seg.size(), col_count);
471  CHECK_EQ(null_bitmap_seg.size(), col_count);
472  const auto entry_count = end_entry - start_entry;
473  size_t seg_row_count = 0;
474  for (size_t i = start_entry; i < end_entry; ++i) {
475  auto row = results_->getRowAtNoTranslations(i);
476  if (row.empty()) {
477  continue;
478  }
479  ++seg_row_count;
480  for (size_t j = 0; j < col_count; ++j) {
481  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
482  // TODO(miyu): support more types other than scalar.
483  CHECK(scalar_value);
484  const auto& column = builders[j];
485  switch (column.physical_type) {
486  case kBOOLEAN:
487  create_or_append_value<bool, int64_t>(
488  *scalar_value, value_seg[j], entry_count);
489  create_or_append_validity<int64_t>(
490  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
491  break;
492  case kTINYINT:
493  create_or_append_value<int8_t, int64_t>(
494  *scalar_value, value_seg[j], entry_count);
495  create_or_append_validity<int64_t>(
496  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
497  break;
498  case kSMALLINT:
499  create_or_append_value<int16_t, int64_t>(
500  *scalar_value, value_seg[j], entry_count);
501  create_or_append_validity<int64_t>(
502  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
503  break;
504  case kINT:
505  create_or_append_value<int32_t, int64_t>(
506  *scalar_value, value_seg[j], entry_count);
507  create_or_append_validity<int64_t>(
508  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
509  break;
510  case kBIGINT:
511  create_or_append_value<int64_t, int64_t>(
512  *scalar_value, value_seg[j], entry_count);
513  create_or_append_validity<int64_t>(
514  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
515  break;
516  case kFLOAT:
517  create_or_append_value<float, float>(
518  *scalar_value, value_seg[j], entry_count);
519  create_or_append_validity<float>(
520  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
521  break;
522  case kDOUBLE:
523  create_or_append_value<double, double>(
524  *scalar_value, value_seg[j], entry_count);
525  create_or_append_validity<double>(
526  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
527  break;
528  case kTIME:
529  create_or_append_value<int32_t, int64_t>(
530  *scalar_value, value_seg[j], entry_count);
531  create_or_append_validity<int64_t>(
532  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
533  break;
534  case kDATE:
535  device_type_ == ExecutorDeviceType::GPU
536  ? create_or_append_value<int64_t, int64_t>(
537  *scalar_value, value_seg[j], entry_count)
538  : create_or_append_value<int32_t, int64_t>(
539  *scalar_value, value_seg[j], entry_count);
540  create_or_append_validity<int64_t>(
541  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
542  break;
543  case kTIMESTAMP:
544  create_or_append_value<int64_t, int64_t>(
545  *scalar_value, value_seg[j], entry_count);
546  create_or_append_validity<int64_t>(
547  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
548  break;
549  default:
550  // TODO(miyu): support more scalar types.
551  throw std::runtime_error(column.col_type.get_type_name() +
552  " is not supported in Arrow result sets.");
553  }
554  }
555  }
556  return seg_row_count;
557  };
558 
559  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
560  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
561  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
562  if (multithreaded) {
563  const size_t cpu_count = cpu_threads();
564  std::vector<std::future<size_t>> child_threads;
565  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
566  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
567  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
568  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
569  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
570  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
571  ++i, start_entry += stride) {
572  const auto end_entry = std::min(entry_count, start_entry + stride);
573  child_threads.push_back(std::async(std::launch::async,
574  fetch,
575  std::ref(column_value_segs[i]),
576  std::ref(null_bitmap_segs[i]),
577  start_entry,
578  end_entry));
579  }
580  for (auto& child : child_threads) {
581  row_count += child.get();
582  }
583  for (int i = 0; i < schema->num_fields(); ++i) {
584  for (size_t j = 0; j < cpu_count; ++j) {
585  if (!column_value_segs[j][i]) {
586  continue;
587  }
588  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
589  }
590  }
591  } else {
592  row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count);
593  for (int i = 0; i < schema->num_fields(); ++i) {
594  append(builders[i], *column_values[i], null_bitmaps[i]);
595  }
596  }
597 
598  for (size_t i = 0; i < col_count; ++i) {
599  result_columns.push_back(finishColumnBuilder(builders[i]));
600  }
601  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
602 }
603 
604 namespace {
605 
606 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
607  const ExecutorDeviceType device_type) {
608  switch (get_physical_type(sql_type)) {
609  case kBOOLEAN:
610  return boolean();
611  case kTINYINT:
612  return int8();
613  case kSMALLINT:
614  return int16();
615  case kINT:
616  return int32();
617  case kBIGINT:
618  return int64();
619  case kFLOAT:
620  return float32();
621  case kDOUBLE:
622  return float64();
623  case kCHAR:
624  case kVARCHAR:
625  case kTEXT:
626  if (sql_type.is_dict_encoded_string()) {
627  auto value_type = std::make_shared<StringType>();
628  return dictionary(int32(), value_type, false);
629  }
630  return utf8();
631  case kDECIMAL:
632  case kNUMERIC:
633  return decimal(sql_type.get_precision(), sql_type.get_scale());
634  case kTIME:
635  return time32(TimeUnit::SECOND);
636  case kDATE:
637  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
638  // Currently support for date32() is missing in cuDF.Hence, if client requests for
639  // date on GPU, return date64() for the time being, till support is added.
640  return device_type == ExecutorDeviceType::GPU ? date64() : date32();
641  case kTIMESTAMP:
642  switch (sql_type.get_precision()) {
643  case 0:
644  return timestamp(TimeUnit::SECOND);
645  case 3:
646  return timestamp(TimeUnit::MILLI);
647  case 6:
648  return timestamp(TimeUnit::MICRO);
649  case 9:
650  return timestamp(TimeUnit::NANO);
651  default:
652  throw std::runtime_error(
653  "Unsupported timestamp precision for Arrow result sets: " +
654  std::to_string(sql_type.get_precision()));
655  }
656  case kARRAY:
657  case kINTERVAL_DAY_TIME:
659  default:
660  throw std::runtime_error(sql_type.get_type_name() +
661  " is not supported in Arrow result sets.");
662  }
663  return nullptr;
664 }
665 
666 } // namespace
667 
668 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
669  const std::string name,
670  const SQLTypeInfo& target_type) const {
671  return arrow::field(
672  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
673 }
674 
676  const ArrowResult& result,
677  const ExecutorDeviceType device_type,
678  const size_t device_id,
679  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
680  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
681  // Remove shared memory on sysmem
682  if (!result.sm_handle.empty()) {
683  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
684  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
685  auto shm_id = shmget(schema_key, result.sm_size, 0666);
686  if (shm_id < 0) {
687  throw std::runtime_error(
688  "failed to get an valid shm ID w/ given shm key of the schema");
689  }
690  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
691  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
692  std::to_string(errno) + ")");
693  }
694  }
695 
696  if (device_type == ExecutorDeviceType::CPU) {
697  CHECK_EQ(sizeof(key_t), result.df_handle.size());
698  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
699  auto shm_id = shmget(df_key, result.df_size, 0666);
700  if (shm_id < 0) {
701  throw std::runtime_error(
702  "failed to get an valid shm ID w/ given shm key of the data");
703  }
704  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
705  throw std::runtime_error("failed to deallocate Arrow data frame");
706  }
707  }
708  // CUDA buffers become owned by the caller, and will automatically be freed
709  // TODO: What if the client never takes ownership of the result? we may want to
710  // establish a check to see if the GPU buffer still exists, and then free it.
711 }
712 
714  ColumnBuilder& column_builder,
715  const SQLTypeInfo& col_type,
716  const std::shared_ptr<arrow::Field>& field) const {
717  column_builder.field = field;
718  column_builder.col_type = col_type;
719  column_builder.physical_type = col_type.is_dict_encoded_string()
720  ? get_dict_index_type(col_type)
721  : get_physical_type(col_type);
722 
723  auto value_type = field->type();
724  if (col_type.is_dict_encoded_string()) {
725  column_builder.builder.reset(new StringDictionary32Builder());
726  // add values to the builder
727  const int dict_id = col_type.get_comp_param();
728  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
729 
730  arrow::StringBuilder str_array_builder;
731  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
732  std::shared_ptr<StringArray> string_array;
733  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
734 
735  auto dict_builder =
736  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
737  CHECK(dict_builder);
738 
739  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
740  } else {
742  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
743  }
744 }
745 
746 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
747  ColumnBuilder& column_builder) const {
748  std::shared_ptr<Array> values;
749  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
750  return values;
751 }
752 
753 namespace {
754 
755 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
757  const ValueArray& values,
758  const std::shared_ptr<std::vector<bool>>& is_valid) {
759  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
760  "Dictionary encoded string builder requires function specialization.");
761 
762  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
763 
764  if (scale_epoch_values<BUILDER_TYPE>()) {
765  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
766  auto scale_values = [&](auto epoch) {
767  return std::is_same<BUILDER_TYPE, Date32Builder>::value
769  : scale_sec_to_millisec(epoch);
770  };
771  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
772  }
773 
774  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
775  CHECK(typed_builder);
776  if (column_builder.field->nullable()) {
777  CHECK(is_valid.get());
778  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
779  } else {
780  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
781  }
782 }
783 
784 template <>
785 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
787  const ValueArray& values,
788  const std::shared_ptr<std::vector<bool>>& is_valid) {
789  auto typed_builder =
790  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
791  CHECK(typed_builder);
792 
793  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
794 
795  if (column_builder.field->nullable()) {
796  CHECK(is_valid.get());
797  // TODO(adb): Generate this instead of the boolean bitmap
798  std::vector<uint8_t> transformed_bitmap;
799  transformed_bitmap.reserve(is_valid->size());
800  std::for_each(
801  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
802  transformed_bitmap.push_back(is_valid ? 1 : 0);
803  });
804 
805  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
806  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
807  } else {
809  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
810  }
811 }
812 
813 } // namespace
814 
816  ColumnBuilder& column_builder,
817  const ValueArray& values,
818  const std::shared_ptr<std::vector<bool>>& is_valid) const {
819  if (column_builder.col_type.is_dict_encoded_string()) {
820  CHECK_EQ(column_builder.physical_type,
821  kINT); // assume all dicts use none-encoded type for now
822  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
823  column_builder, values, is_valid);
824  return;
825  }
826  switch (column_builder.physical_type) {
827  case kBOOLEAN:
828  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
829  break;
830  case kTINYINT:
831  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
832  break;
833  case kSMALLINT:
834  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
835  break;
836  case kINT:
837  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
838  break;
839  case kBIGINT:
840  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
841  break;
842  case kFLOAT:
843  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
844  break;
845  case kDOUBLE:
846  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
847  break;
848  case kTIME:
849  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
850  break;
851  case kTIMESTAMP:
852  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
853  break;
854  case kDATE:
855  device_type_ == ExecutorDeviceType::GPU
856  ? appendToColumnBuilder<Date64Builder, int64_t>(
857  column_builder, values, is_valid)
858  : appendToColumnBuilder<Date32Builder, int32_t>(
859  column_builder, values, is_valid);
860  break;
861  case kCHAR:
862  case kVARCHAR:
863  case kTEXT:
864  default:
865  // TODO(miyu): support more scalar types.
866  throw std::runtime_error(column_builder.col_type.get_type_name() +
867  " is not supported in Arrow result sets.");
868  }
869 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:51
SQLTypes
Definition: sqltypes.h:40
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
bool is_fp() const
Definition: sqltypes.h:420
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< arrow::Field > field
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:422
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
#define ARROW_RECORDBATCH_MAKE
CHECK(cgen_state)
bool is_integer() const
Definition: sqltypes.h:418
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:140
bool is_boolean() const
Definition: sqltypes.h:423
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryMemo *memo) const
int get_precision() const
Definition: sqltypes.h:262
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:54
Definition: sqltypes.h:55
int64_t sm_size
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:362
#define IS_INTEGER(T)
Definition: sqltypes.h:168
Definition: sqltypes.h:43
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
bool is_dict_encoded_string() const
Definition: sqltypes.h:443
Definition: sqltypes.h:47
parquet::Type::type get_physical_type(std::unique_ptr< parquet::arrow::FileReader > &reader, const int logical_column_index)
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
int cpu_threads()
Definition: thread_count.h:24
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
std::shared_ptr< arrow::RecordBatch > convertToArrow() const