OmniSciDB  c07336695a
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 
21 #include <sys/ipc.h>
22 #include <sys/shm.h>
23 #include <sys/types.h>
24 #include <algorithm>
25 #include <cerrno>
26 #include <cstdio>
27 #include <cstdlib>
28 #include <string>
29 
30 #include "arrow/api.h"
31 #include "arrow/io/memory.h"
32 #include "arrow/ipc/api.h"
33 
34 #include "ArrowUtil.h"
35 
36 #ifdef HAVE_CUDA
37 #include <cuda.h>
38 #endif // HAVE_CUDA
39 #include <future>
40 
41 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
42 
43 #define APPENDVALUES AppendValues
44 
45 using namespace arrow;
46 
47 namespace {
48 
51  switch (ti.get_size()) {
52  case 1:
53  return kTINYINT;
54  case 2:
55  return kSMALLINT;
56  case 4:
57  return kINT;
58  case 8:
59  return kBIGINT;
60  default:
61  CHECK(false);
62  }
63  return ti.get_type();
64 }
65 
68  switch (ti.get_size()) {
69  case 1:
70  return SQLTypeInfo(kTINYINT, ti.get_notnull());
71  case 2:
72  return SQLTypeInfo(kSMALLINT, ti.get_notnull());
73  case 4:
74  return SQLTypeInfo(kINT, ti.get_notnull());
75  case 8:
76  return SQLTypeInfo(kBIGINT, ti.get_notnull());
77  default:
78  CHECK(false);
79  }
80  return ti;
81 }
82 
84  auto logical_type = ti.get_type();
85  if (IS_INTEGER(logical_type)) {
86  switch (ti.get_size()) {
87  case 1:
88  return kTINYINT;
89  case 2:
90  return kSMALLINT;
91  case 4:
92  return kINT;
93  case 8:
94  return kBIGINT;
95  default:
96  CHECK(false);
97  }
98  }
99  return logical_type;
100 }
101 
102 template <typename TYPE, typename C_TYPE>
104  std::shared_ptr<ValueArray>& values,
105  const size_t max_size) {
106  auto pval_cty = boost::get<C_TYPE>(&val_cty);
107  CHECK(pval_cty);
108  auto val_ty = static_cast<TYPE>(*pval_cty);
109  if (!values) {
110  values = std::make_shared<ValueArray>(std::vector<TYPE>());
111  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
112  }
113  CHECK(values);
114  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
115  CHECK(values_ty);
116  values_ty->push_back(val_ty);
117 }
118 
119 template <typename TYPE>
121  const SQLTypeInfo& col_type,
122  std::shared_ptr<std::vector<bool>>& null_bitmap,
123  const size_t max_size) {
124  if (col_type.get_notnull()) {
125  CHECK(!null_bitmap);
126  return;
127  }
128  auto pvalue = boost::get<TYPE>(&value);
129  CHECK(pvalue);
130  bool is_valid = false;
131  if (col_type.is_boolean()) {
132  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
133  } else if (col_type.is_dict_encoded_string()) {
134  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
135  } else if (col_type.is_integer() || col_type.is_time()) {
136  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
137  } else if (col_type.is_fp()) {
138  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
139  } else {
140  UNREACHABLE();
141  }
142 
143  if (!null_bitmap) {
144  null_bitmap = std::make_shared<std::vector<bool>>();
145  null_bitmap->reserve(max_size);
146  }
147  CHECK(null_bitmap);
148  null_bitmap->push_back(is_valid);
149 }
150 
151 } // namespace
152 
153 namespace arrow {
154 
155 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
156  if (!data->size()) {
157  return IPC_PRIVATE;
158  }
159  // Generate a new key for a shared memory segment. Keys to shared memory segments
160  // are OS global, so we need to try a new key if we encounter a collision. It seems
161  // incremental keygen would be deterministically worst-case. If we use a hash
162  // (like djb2) + nonce, we could still get collisions if multiple clients specify
163  // the same nonce, so using rand() in lieu of a better approach
164  // TODO(ptaylor): Is this common? Are these assumptions true?
165  auto key = static_cast<key_t>(rand());
166  const auto shmsz = data->size();
167  int shmid = -1;
168  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
169  // exist IPC_EXCL - ensures failure if a segment already exists for this key
170  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
171  // If shmget fails and errno is one of these four values, try a new key.
172  // TODO(ptaylor): is checking for the last three values really necessary? Checking
173  // them by default to be safe. EEXIST - a shared memory segment is already associated
174  // with this key EACCES - a shared memory segment is already associated with this key,
175  // but we don't have permission to access it EINVAL - a shared memory segment is
176  // already associated with this key, but the size is less than shmsz ENOENT -
177  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
178  // was found
179  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
180  throw std::runtime_error("failed to create a shared memory");
181  }
182  key = static_cast<key_t>(rand());
183  }
184  // get a pointer to the shared memory segment
185  auto ipc_ptr = shmat(shmid, NULL, 0);
186  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
187  throw std::runtime_error("failed to attach a shared memory");
188  }
189 
190  // copy the arrow records buffer to shared memory
191  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
192  // write directly to the shared memory segment as a sink
193  memcpy(ipc_ptr, data->data(), data->size());
194  // detach from the shared memory segment
195  shmdt(ipc_ptr);
196  return key;
197 }
198 
199 } // namespace arrow
200 
201 // WARN(ptaylor): users are responsible for detaching and removing shared memory segments,
202 // e.g.,
203 // int shmid = shmget(...);
204 // auto ipc_ptr = shmat(shmid, ...);
205 // ...
206 // shmdt(ipc_ptr);
207 // shmctl(shmid, IPC_RMID, 0);
208 // WARN(miyu): users are responsible to free all device copies, e.g.,
209 // cudaIpcMemHandle_t mem_handle = ...
210 // void* dev_ptr;
211 // cudaIpcOpenMemHandle(&dev_ptr, mem_handle, cudaIpcMemLazyEnablePeerAccess);
212 // ...
213 // cudaIpcCloseMemHandle(dev_ptr);
214 // cudaFree(dev_ptr);
215 //
216 // TODO(miyu): verify if the server still needs to free its own copies after last uses
218  const auto serialized_arrow_output = getSerializedArrowOutput();
219  const auto& serialized_schema = serialized_arrow_output.schema;
220  const auto& serialized_records = serialized_arrow_output.records;
221 
222  const auto schema_key = arrow::get_and_copy_to_shm(serialized_schema);
223  CHECK(schema_key != IPC_PRIVATE);
224  std::vector<char> schema_handle_buffer(sizeof(key_t), 0);
225  memcpy(&schema_handle_buffer[0],
226  reinterpret_cast<const unsigned char*>(&schema_key),
227  sizeof(key_t));
228  if (device_type_ == ExecutorDeviceType::CPU) {
229  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
230  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
231  memcpy(&record_handle_buffer[0],
232  reinterpret_cast<const unsigned char*>(&record_key),
233  sizeof(key_t));
234 
235  return {schema_handle_buffer,
236  serialized_schema->size(),
237  record_handle_buffer,
238  serialized_records->size(),
239  nullptr};
240  }
241 #ifdef HAVE_CUDA
242  if (serialized_records->size()) {
243  CHECK(data_mgr_);
244  const auto cuda_mgr = data_mgr_->getCudaMgr();
245  CHECK(cuda_mgr);
246  auto dev_ptr = reinterpret_cast<CUdeviceptr>(
247  cuda_mgr->allocateDeviceMem(serialized_records->size(), device_id_));
248  CUipcMemHandle record_handle;
249  cuIpcGetMemHandle(&record_handle, dev_ptr);
250  cuda_mgr->copyHostToDevice(
251  reinterpret_cast<int8_t*>(dev_ptr),
252  reinterpret_cast<const int8_t*>(serialized_records->data()),
253  serialized_records->size(),
254  device_id_);
255  std::vector<char> record_handle_buffer(sizeof(record_handle), 0);
256  memcpy(&record_handle_buffer[0],
257  reinterpret_cast<unsigned char*>(&record_handle),
258  sizeof(CUipcMemHandle));
259  return {schema_handle_buffer,
260  serialized_schema->size(),
261  record_handle_buffer,
262  serialized_records->size(),
263  reinterpret_cast<int8_t*>(dev_ptr)};
264  }
265 #endif
266  return {schema_handle_buffer, serialized_schema->size(), {}, 0, nullptr};
267 }
268 
271  arrow::ipc::DictionaryMemo dict_memo;
272  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow(dict_memo);
273  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
274 
275  ARROW_THROW_NOT_OK(arrow::ipc::SerializeSchema(
276  *arrow_copy->schema(), arrow::default_memory_pool(), &serialized_schema));
277 
278  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
279  *arrow_copy, arrow::default_memory_pool(), &serialized_records));
280  return {serialized_schema, serialized_records};
281 }
282 
283 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow(
284  arrow::ipc::DictionaryMemo& memo) const {
285  const auto col_count = results_->colCount();
286  std::vector<std::shared_ptr<arrow::Field>> fields;
287  CHECK(col_names_.empty() || col_names_.size() == col_count);
288  for (size_t i = 0; i < col_count; ++i) {
289  const auto ti = results_->getColType(i);
290  std::shared_ptr<arrow::Array> dict;
291  if (ti.is_dict_encoded_string()) {
292  const int dict_id = ti.get_comp_param();
293  if (memo.HasDictionaryId(dict_id)) {
294  ARROW_THROW_NOT_OK(memo.GetDictionary(dict_id, &dict));
295  } else {
296  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
297 
298  arrow::StringBuilder builder;
299  // TODO(andrewseidl): replace with AppendValues() once Arrow 0.7.1 support is
300  // fully deprecated
301  for (const std::string& val : *str_list) {
302  ARROW_THROW_NOT_OK(builder.Append(val));
303  }
304  ARROW_THROW_NOT_OK(builder.Finish(&dict));
305  ARROW_THROW_NOT_OK(memo.AddDictionary(dict_id, dict));
306  }
307  }
308  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti, dict));
309  }
310  return getArrowBatch(arrow::schema(fields));
311 }
312 
313 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
314  const std::shared_ptr<arrow::Schema>& schema) const {
315  std::vector<std::shared_ptr<arrow::Array>> result_columns;
316 
317  const size_t entry_count = top_n_ < 0
318  ? results_->entryCount()
319  : std::min(size_t(top_n_), results_->entryCount());
320  if (!entry_count) {
321  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
322  }
323  const auto col_count = results_->colCount();
324  size_t row_count = 0;
325 
326  std::vector<ColumnBuilder> builders(col_count);
327 
328  // Create array builders
329  for (size_t i = 0; i < col_count; ++i) {
330  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
331  }
332 
333  // TODO(miyu): speed up for columnar buffers
334  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
335  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
336  const size_t start_entry,
337  const size_t end_entry) -> size_t {
338  CHECK_EQ(value_seg.size(), col_count);
339  CHECK_EQ(null_bitmap_seg.size(), col_count);
340  const auto entry_count = end_entry - start_entry;
341  size_t seg_row_count = 0;
342  for (size_t i = start_entry; i < end_entry; ++i) {
343  auto row = results_->getRowAtNoTranslations(i);
344  if (row.empty()) {
345  continue;
346  }
347  ++seg_row_count;
348  for (size_t j = 0; j < col_count; ++j) {
349  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
350  // TODO(miyu): support more types other than scalar.
351  CHECK(scalar_value);
352  const auto& column = builders[j];
353  switch (column.physical_type) {
354  case kBOOLEAN:
355  create_or_append_value<bool, int64_t>(
356  *scalar_value, value_seg[j], entry_count);
357  create_or_append_validity<int64_t>(
358  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
359  break;
360  case kTINYINT:
361  create_or_append_value<int8_t, int64_t>(
362  *scalar_value, value_seg[j], entry_count);
363  create_or_append_validity<int64_t>(
364  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
365  break;
366  case kSMALLINT:
367  create_or_append_value<int16_t, int64_t>(
368  *scalar_value, value_seg[j], entry_count);
369  create_or_append_validity<int64_t>(
370  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
371  break;
372  case kINT:
373  create_or_append_value<int32_t, int64_t>(
374  *scalar_value, value_seg[j], entry_count);
375  create_or_append_validity<int64_t>(
376  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
377  break;
378  case kBIGINT:
379  create_or_append_value<int64_t, int64_t>(
380  *scalar_value, value_seg[j], entry_count);
381  create_or_append_validity<int64_t>(
382  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
383  break;
384  case kFLOAT:
385  create_or_append_value<float, float>(
386  *scalar_value, value_seg[j], entry_count);
387  create_or_append_validity<float>(
388  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
389  break;
390  case kDOUBLE:
391  create_or_append_value<double, double>(
392  *scalar_value, value_seg[j], entry_count);
393  create_or_append_validity<double>(
394  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
395  break;
396  case kTIME:
397  create_or_append_value<int32_t, int64_t>(
398  *scalar_value, value_seg[j], entry_count);
399  create_or_append_validity<int64_t>(
400  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
401  break;
402  case kDATE:
403  device_type_ == ExecutorDeviceType::GPU
404  ? create_or_append_value<int64_t, int64_t>(
405  *scalar_value, value_seg[j], entry_count)
406  : create_or_append_value<int32_t, int64_t>(
407  *scalar_value, value_seg[j], entry_count);
408  create_or_append_validity<int64_t>(
409  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
410  break;
411  case kTIMESTAMP:
412  create_or_append_value<int64_t, int64_t>(
413  *scalar_value, value_seg[j], entry_count);
414  create_or_append_validity<int64_t>(
415  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
416  break;
417  default:
418  // TODO(miyu): support more scalar types.
419  throw std::runtime_error(column.col_type.get_type_name() +
420  " is not supported in Arrow result sets.");
421  }
422  }
423  }
424  return seg_row_count;
425  };
426 
427  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
428  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
429  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
430  if (multithreaded) {
431  const size_t cpu_count = cpu_threads();
432  std::vector<std::future<size_t>> child_threads;
433  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
434  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
435  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
436  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
437  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
438  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
439  ++i, start_entry += stride) {
440  const auto end_entry = std::min(entry_count, start_entry + stride);
441  child_threads.push_back(std::async(std::launch::async,
442  fetch,
443  std::ref(column_value_segs[i]),
444  std::ref(null_bitmap_segs[i]),
445  start_entry,
446  end_entry));
447  }
448  for (auto& child : child_threads) {
449  row_count += child.get();
450  }
451  for (int i = 0; i < schema->num_fields(); ++i) {
452  reserveColumnBuilderSize(builders[i], row_count);
453  for (size_t j = 0; j < cpu_count; ++j) {
454  if (!column_value_segs[j][i]) {
455  continue;
456  }
457  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
458  }
459  }
460  } else {
461  row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count);
462  for (int i = 0; i < schema->num_fields(); ++i) {
463  reserveColumnBuilderSize(builders[i], row_count);
464  append(builders[i], *column_values[i], null_bitmaps[i]);
465  }
466  }
467 
468  for (size_t i = 0; i < col_count; ++i) {
469  result_columns.push_back(finishColumnBuilder(builders[i]));
470  }
471  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
472 }
473 
474 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
475  const std::string name,
476  const SQLTypeInfo& target_type,
477  const std::shared_ptr<arrow::Array>& dictionary) const {
478  return arrow::field(
479  name, getArrowType(target_type, dictionary), !target_type.get_notnull());
480 }
481 
482 std::shared_ptr<arrow::DataType> ArrowResultSetConverter::getArrowType(
483  const SQLTypeInfo& mapd_type,
484  const std::shared_ptr<arrow::Array>& dict_values) const {
485  switch (get_physical_type(mapd_type)) {
486  case kBOOLEAN:
487  return boolean();
488  case kTINYINT:
489  return int8();
490  case kSMALLINT:
491  return int16();
492  case kINT:
493  return int32();
494  case kBIGINT:
495  return int64();
496  case kFLOAT:
497  return float32();
498  case kDOUBLE:
499  return float64();
500  case kCHAR:
501  case kVARCHAR:
502  case kTEXT:
503  if (mapd_type.is_dict_encoded_string()) {
504  CHECK(dict_values);
505  const auto index_type =
506  getArrowType(get_dict_index_type_info(mapd_type), nullptr);
507  return dictionary(index_type, dict_values);
508  }
509  return utf8();
510  case kDECIMAL:
511  case kNUMERIC:
512  return decimal(mapd_type.get_precision(), mapd_type.get_scale());
513  case kTIME:
514  return time32(TimeUnit::SECOND);
515  case kDATE:
516  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
517  // Currently support for date32() is missing in cuDF.Hence, if client requests for
518  // date on GPU, return date64() for the time being, till support is added.
519  return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
520  case kTIMESTAMP:
521  switch (mapd_type.get_precision()) {
522  case 0:
523  return timestamp(TimeUnit::SECOND);
524  case 3:
525  return timestamp(TimeUnit::MILLI);
526  case 6:
527  return timestamp(TimeUnit::MICRO);
528  case 9:
529  return timestamp(TimeUnit::NANO);
530  default:
531  throw std::runtime_error(
532  "Unsupported timestamp precision for Arrow result sets: " +
533  std::to_string(mapd_type.get_precision()));
534  }
535  case kARRAY:
536  case kINTERVAL_DAY_TIME:
538  default:
539  throw std::runtime_error(mapd_type.get_type_name() +
540  " is not supported in Arrow result sets.");
541  }
542  return nullptr;
543 }
544 
546  const ArrowResult& result,
547  const ExecutorDeviceType device_type,
548  const size_t device_id,
549  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
550  // Remove shared memory on sysmem
551  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
552  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
553  auto shm_id = shmget(schema_key, result.sm_size, 0666);
554  if (shm_id < 0) {
555  throw std::runtime_error(
556  "failed to get an valid shm ID w/ given shm key of the schema");
557  }
558  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
559  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
560  std::to_string(errno) + ")");
561  }
562 
563  if (device_type == ExecutorDeviceType::CPU) {
564  CHECK_EQ(sizeof(key_t), result.df_handle.size());
565  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
566  auto shm_id = shmget(df_key, result.df_size, 0666);
567  if (shm_id < 0) {
568  throw std::runtime_error(
569  "failed to get an valid shm ID w/ given shm key of the data");
570  }
571  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
572  throw std::runtime_error("failed to deallocate Arrow data frame");
573  }
574  return;
575  }
576 
577  CHECK(device_type == ExecutorDeviceType::GPU);
578  if (!result.df_dev_ptr) {
579  throw std::runtime_error("null pointer to data frame on device");
580  }
581 
582  data_mgr->getCudaMgr()->freeDeviceMem(result.df_dev_ptr);
583 }
584 
586  ColumnBuilder& column_builder,
587  const SQLTypeInfo& col_type,
588  const std::shared_ptr<arrow::Field>& field) const {
589  column_builder.field = field;
590  column_builder.col_type = col_type;
591  column_builder.physical_type = col_type.is_dict_encoded_string()
592  ? get_dict_index_type(col_type)
593  : get_physical_type(col_type);
594 
595  auto value_type = field->type();
596  if (value_type->id() == Type::DICTIONARY) {
597  value_type = static_cast<const DictionaryType&>(*value_type).index_type();
598  }
600  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
601 }
602 
604  const size_t row_count) const {
605  ARROW_THROW_NOT_OK(column_builder.builder->Reserve(static_cast<int64_t>(row_count)));
606 }
607 
608 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
609  ColumnBuilder& column_builder) const {
610  std::shared_ptr<Array> values;
611  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
612  if (column_builder.field->type()->id() == Type::DICTIONARY) {
613  return std::make_shared<DictionaryArray>(column_builder.field->type(), values);
614  } else {
615  return values;
616  }
617 }
618 
619 template <typename BuilderType, typename C_TYPE>
621  ColumnBuilder& column_builder,
622  const ValueArray& values,
623  const std::shared_ptr<std::vector<bool>>& is_valid) const {
624  std::vector<C_TYPE> vals = boost::get<std::vector<C_TYPE>>(values);
625 
626  if (scale_epoch_values<BuilderType>()) {
627  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
628  auto scale_values = [&](auto epoch) {
629  return std::is_same<BuilderType, Date32Builder>::value
631  : scale_sec_to_millisec(epoch);
632  };
633  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
634  }
635 
636  auto typed_builder = static_cast<BuilderType*>(column_builder.builder.get());
637  if (column_builder.field->nullable()) {
638  CHECK(is_valid.get());
639  ARROW_THROW_NOT_OK(typed_builder->APPENDVALUES(vals, *is_valid));
640  } else {
641  ARROW_THROW_NOT_OK(typed_builder->APPENDVALUES(vals));
642  }
643 }
644 
646  ColumnBuilder& column_builder,
647  const ValueArray& values,
648  const std::shared_ptr<std::vector<bool>>& is_valid) const {
649  switch (column_builder.physical_type) {
650  case kBOOLEAN:
651  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
652  break;
653  case kTINYINT:
654  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
655  break;
656  case kSMALLINT:
657  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
658  break;
659  case kINT:
660  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
661  break;
662  case kBIGINT:
663  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
664  break;
665  case kFLOAT:
666  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
667  break;
668  case kDOUBLE:
669  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
670  break;
671  case kTIME:
672  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
673  break;
674  case kTIMESTAMP:
675  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
676  break;
677  case kDATE:
678  device_type_ == ExecutorDeviceType::GPU
679  ? appendToColumnBuilder<Date64Builder, int64_t>(
680  column_builder, values, is_valid)
681  : appendToColumnBuilder<Date32Builder, int32_t>(
682  column_builder, values, is_valid);
683  break;
684  case kCHAR:
685  case kVARCHAR:
686  case kTEXT:
687  default:
688  // TODO(miyu): support more scalar types.
689  throw std::runtime_error(column_builder.col_type.get_type_name() +
690  " is not supported in Arrow result sets.");
691  }
692 }
693 
694 // helpers for debugging
695 
696 #ifdef ENABLE_ARROW_DEBUG
697 void print_serialized_schema(const uint8_t* data, const size_t length) {
698  io::BufferReader reader(std::make_shared<arrow::Buffer>(data, length));
699  std::shared_ptr<Schema> schema;
700  ARROW_THROW_NOT_OK(ipc::ReadSchema(&reader, &schema));
701 
702  std::cout << "Arrow Schema: " << std::endl;
703  const PrettyPrintOptions options{0};
704  ARROW_THROW_NOT_OK(PrettyPrint(*(schema.get()), options, &std::cout));
705 }
706 
707 void print_serialized_records(const uint8_t* data,
708  const size_t length,
709  const std::shared_ptr<Schema>& schema) {
710  if (data == nullptr || !length) {
711  std::cout << "No row found" << std::endl;
712  return;
713  }
714  std::shared_ptr<RecordBatch> batch;
715 
716  io::BufferReader buffer_reader(std::make_shared<arrow::Buffer>(data, length));
717  ARROW_THROW_NOT_OK(ipc::ReadRecordBatch(schema, &buffer_reader, &batch));
718 }
719 #endif
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
std::unique_ptr< arrow::ArrayBuilder > builder
void appendToColumnBuilder(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
bool is_time() const
Definition: sqltypes.h:452
HOST DEVICE int get_size() const
Definition: sqltypes.h:329
int get_precision() const
Definition: sqltypes.h:322
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:28
Definition: sqltypes.h:51
bool is_fp() const
Definition: sqltypes.h:450
SQLTypes
Definition: sqltypes.h:40
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type, const std::shared_ptr< arrow::Array > &dictionary) const
std::vector< char > sm_handle
ArrowResult getArrowResultImpl() const
ExecutorDeviceType
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:326
SQLTypeInfo get_dict_index_type_info(const SQLTypeInfo &ti)
HOST DEVICE int get_scale() const
Definition: sqltypes.h:324
unsigned long long CUdeviceptr
Definition: nocuda.h:27
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:319
#define UNREACHABLE()
Definition: Logger.h:231
std::shared_ptr< arrow::Field > field
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::string to_string(char const *&&v)
static constexpr int64_t kMilliSecsPerSec
void reserveColumnBuilderSize(ColumnBuilder &column_builder, const size_t row_count) const
std::string get_type_name() const
Definition: sqltypes.h:422
std::shared_ptr< arrow::DataType > getArrowType(const SQLTypeInfo &mapd_type, const std::shared_ptr< arrow::Array > &dict_values) const
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
SerializedArrowOutput getSerializedArrowOutput() const
bool is_integer() const
Definition: sqltypes.h:448
#define ARROW_RECORDBATCH_MAKE
int8_t * df_dev_ptr
std::vector< char > df_handle
bool is_dict_encoded_string() const
Definition: sqltypes.h:472
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string > > ValueArray
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:136
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:819
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
bool is_boolean() const
Definition: sqltypes.h:453
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
Definition: sqltypes.h:54
Definition: sqltypes.h:55
Definition: Importer.h:66
int64_t sm_size
int64_t df_size
#define IS_INTEGER(T)
Definition: sqltypes.h:158
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
Definition: sqltypes.h:43
std::shared_ptr< arrow::RecordBatch > convertToArrow(arrow::ipc::DictionaryMemo &memo) const
#define CHECK(condition)
Definition: Logger.h:187
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
Definition: sqltypes.h:47
int cpu_threads()
Definition: thread_count.h:23
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156