OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 
21 #include <sys/ipc.h>
22 #include <sys/shm.h>
23 #include <sys/types.h>
24 #include <algorithm>
25 #include <cerrno>
26 #include <cstdio>
27 #include <cstdlib>
28 #include <string>
29 
30 #include "arrow/api.h"
31 #include "arrow/io/memory.h"
32 #include "arrow/ipc/api.h"
33 
34 #include "ArrowUtil.h"
35 
36 #ifdef HAVE_CUDA
37 #include <cuda.h>
38 #endif // HAVE_CUDA
39 #include <future>
40 
41 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
42 
43 #define APPENDVALUES AppendValues
44 
45 using namespace arrow;
46 
47 namespace {
48 
51  switch (ti.get_size()) {
52  case 1:
53  return kTINYINT;
54  case 2:
55  return kSMALLINT;
56  case 4:
57  return kINT;
58  case 8:
59  return kBIGINT;
60  default:
61  CHECK(false);
62  }
63  return ti.get_type();
64 }
65 
68  switch (ti.get_size()) {
69  case 1:
70  return SQLTypeInfo(kTINYINT, ti.get_notnull());
71  case 2:
72  return SQLTypeInfo(kSMALLINT, ti.get_notnull());
73  case 4:
74  return SQLTypeInfo(kINT, ti.get_notnull());
75  case 8:
76  return SQLTypeInfo(kBIGINT, ti.get_notnull());
77  default:
78  CHECK(false);
79  }
80  return ti;
81 }
82 
84  auto logical_type = ti.get_type();
85  if (IS_INTEGER(logical_type)) {
86  switch (ti.get_size()) {
87  case 1:
88  return kTINYINT;
89  case 2:
90  return kSMALLINT;
91  case 4:
92  return kINT;
93  case 8:
94  return kBIGINT;
95  default:
96  CHECK(false);
97  }
98  }
99  return logical_type;
100 }
101 
102 template <typename TYPE, typename C_TYPE>
104  std::shared_ptr<ValueArray>& values,
105  const size_t max_size) {
106  auto pval_cty = boost::get<C_TYPE>(&val_cty);
107  CHECK(pval_cty);
108  auto val_ty = static_cast<TYPE>(*pval_cty);
109  if (!values) {
110  values = std::make_shared<ValueArray>(std::vector<TYPE>());
111  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
112  }
113  CHECK(values);
114  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
115  CHECK(values_ty);
116  values_ty->push_back(val_ty);
117 }
118 
119 template <typename TYPE>
121  const SQLTypeInfo& col_type,
122  std::shared_ptr<std::vector<bool>>& null_bitmap,
123  const size_t max_size) {
124  if (col_type.get_notnull()) {
125  CHECK(!null_bitmap);
126  return;
127  }
128  auto pvalue = boost::get<TYPE>(&value);
129  CHECK(pvalue);
130  bool is_valid = false;
131  if (col_type.is_boolean()) {
132  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
133  } else if (col_type.is_dict_encoded_string()) {
134  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
135  } else if (col_type.is_integer() || col_type.is_time()) {
136  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
137  } else if (col_type.is_fp()) {
138  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
139  } else {
140  UNREACHABLE();
141  }
142 
143  if (!null_bitmap) {
144  null_bitmap = std::make_shared<std::vector<bool>>();
145  null_bitmap->reserve(max_size);
146  }
147  CHECK(null_bitmap);
148  null_bitmap->push_back(is_valid);
149 }
150 
151 } // namespace
152 
153 namespace arrow {
154 
155 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
156  if (!data->size()) {
157  return IPC_PRIVATE;
158  }
159  // Generate a new key for a shared memory segment. Keys to shared memory segments
160  // are OS global, so we need to try a new key if we encounter a collision. It seems
161  // incremental keygen would be deterministically worst-case. If we use a hash
162  // (like djb2) + nonce, we could still get collisions if multiple clients specify
163  // the same nonce, so using rand() in lieu of a better approach
164  // TODO(ptaylor): Is this common? Are these assumptions true?
165  auto key = static_cast<key_t>(rand());
166  const auto shmsz = data->size();
167  int shmid = -1;
168  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
169  // exist IPC_EXCL - ensures failure if a segment already exists for this key
170  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
171  // If shmget fails and errno is one of these four values, try a new key.
172  // TODO(ptaylor): is checking for the last three values really necessary? Checking
173  // them by default to be safe. EEXIST - a shared memory segment is already associated
174  // with this key EACCES - a shared memory segment is already associated with this key,
175  // but we don't have permission to access it EINVAL - a shared memory segment is
176  // already associated with this key, but the size is less than shmsz ENOENT -
177  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
178  // was found
179  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
180  throw std::runtime_error("failed to create a shared memory");
181  }
182  key = static_cast<key_t>(rand());
183  }
184  // get a pointer to the shared memory segment
185  auto ipc_ptr = shmat(shmid, NULL, 0);
186  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
187  throw std::runtime_error("failed to attach a shared memory");
188  }
189 
190  // copy the arrow records buffer to shared memory
191  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
192  // write directly to the shared memory segment as a sink
193  memcpy(ipc_ptr, data->data(), data->size());
194  // detach from the shared memory segment
195  shmdt(ipc_ptr);
196  return key;
197 }
198 
199 } // namespace arrow
200 
201 // WARN(ptaylor): users are responsible for detaching and removing shared memory segments,
202 // e.g.,
203 // int shmid = shmget(...);
204 // auto ipc_ptr = shmat(shmid, ...);
205 // ...
206 // shmdt(ipc_ptr);
207 // shmctl(shmid, IPC_RMID, 0);
208 // WARN(miyu): users are responsible to free all device copies, e.g.,
209 // cudaIpcMemHandle_t mem_handle = ...
210 // void* dev_ptr;
211 // cudaIpcOpenMemHandle(&dev_ptr, mem_handle, cudaIpcMemLazyEnablePeerAccess);
212 // ...
213 // cudaIpcCloseMemHandle(dev_ptr);
214 // cudaFree(dev_ptr);
215 //
216 // TODO(miyu): verify if the server still needs to free its own copies after last uses
218  const auto serialized_arrow_output = getSerializedArrowOutput();
219  const auto& serialized_schema = serialized_arrow_output.schema;
220  const auto& serialized_records = serialized_arrow_output.records;
221 
222  const auto schema_key = arrow::get_and_copy_to_shm(serialized_schema);
223  CHECK(schema_key != IPC_PRIVATE);
224  std::vector<char> schema_handle_buffer(sizeof(key_t), 0);
225  memcpy(&schema_handle_buffer[0],
226  reinterpret_cast<const unsigned char*>(&schema_key),
227  sizeof(key_t));
228  if (device_type_ == ExecutorDeviceType::CPU) {
229  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
230  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
231  memcpy(&record_handle_buffer[0],
232  reinterpret_cast<const unsigned char*>(&record_key),
233  sizeof(key_t));
234 
235  return {schema_handle_buffer,
236  serialized_schema->size(),
237  record_handle_buffer,
238  serialized_records->size(),
239  nullptr};
240  }
241 #ifdef HAVE_CUDA
242  if (serialized_records->size()) {
243  CHECK(data_mgr_);
244  const auto cuda_mgr = data_mgr_->getCudaMgr();
245  CHECK(cuda_mgr);
246  auto dev_ptr = reinterpret_cast<CUdeviceptr>(
247  cuda_mgr->allocateDeviceMem(serialized_records->size(), device_id_));
248  CUipcMemHandle record_handle;
249  cuIpcGetMemHandle(&record_handle, dev_ptr);
250  cuda_mgr->copyHostToDevice(
251  reinterpret_cast<int8_t*>(dev_ptr),
252  reinterpret_cast<const int8_t*>(serialized_records->data()),
253  serialized_records->size(),
254  device_id_);
255  std::vector<char> record_handle_buffer(sizeof(record_handle), 0);
256  memcpy(&record_handle_buffer[0],
257  reinterpret_cast<unsigned char*>(&record_handle),
258  sizeof(CUipcMemHandle));
259  return {schema_handle_buffer,
260  serialized_schema->size(),
261  record_handle_buffer,
262  serialized_records->size(),
263  reinterpret_cast<int8_t*>(dev_ptr)};
264  }
265 #endif
266  return {schema_handle_buffer, serialized_schema->size(), {}, 0, nullptr};
267 }
268 
271  arrow::ipc::DictionaryMemo dict_memo;
272  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow(dict_memo);
273  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
274 
275  ARROW_THROW_NOT_OK(arrow::ipc::SerializeSchema(
276  *arrow_copy->schema(), arrow::default_memory_pool(), &serialized_schema));
277 
278  if (arrow_copy->num_rows()) {
279  ARROW_THROW_NOT_OK(arrow_copy->Validate());
280  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
281  *arrow_copy, arrow::default_memory_pool(), &serialized_records));
282  } else {
283  ARROW_THROW_NOT_OK(arrow::AllocateBuffer(0, &serialized_records));
284  }
285  return {serialized_schema, serialized_records};
286 }
287 
288 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow(
289  arrow::ipc::DictionaryMemo& memo) const {
290  const auto col_count = results_->colCount();
291  std::vector<std::shared_ptr<arrow::Field>> fields;
292  CHECK(col_names_.empty() || col_names_.size() == col_count);
293  for (size_t i = 0; i < col_count; ++i) {
294  const auto ti = results_->getColType(i);
295  std::shared_ptr<arrow::Array> dict;
296  if (ti.is_dict_encoded_string()) {
297  const int dict_id = ti.get_comp_param();
298  if (memo.HasDictionaryId(dict_id)) {
299  ARROW_THROW_NOT_OK(memo.GetDictionary(dict_id, &dict));
300  } else {
301  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
302 
303  arrow::StringBuilder builder;
304  // TODO(andrewseidl): replace with AppendValues() once Arrow 0.7.1 support is
305  // fully deprecated
306  for (const std::string& val : *str_list) {
307  ARROW_THROW_NOT_OK(builder.Append(val));
308  }
309  ARROW_THROW_NOT_OK(builder.Finish(&dict));
310  ARROW_THROW_NOT_OK(memo.AddDictionary(dict_id, dict));
311  }
312  }
313  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti, dict));
314  }
315  return getArrowBatch(arrow::schema(fields));
316 }
317 
318 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
319  const std::shared_ptr<arrow::Schema>& schema) const {
320  std::vector<std::shared_ptr<arrow::Array>> result_columns;
321 
322  const size_t entry_count = top_n_ < 0
323  ? results_->entryCount()
324  : std::min(size_t(top_n_), results_->entryCount());
325  if (!entry_count) {
326  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
327  }
328  const auto col_count = results_->colCount();
329  size_t row_count = 0;
330 
331  std::vector<ColumnBuilder> builders(col_count);
332 
333  // Create array builders
334  for (size_t i = 0; i < col_count; ++i) {
335  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
336  }
337 
338  // TODO(miyu): speed up for columnar buffers
339  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
340  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
341  const size_t start_entry,
342  const size_t end_entry) -> size_t {
343  CHECK_EQ(value_seg.size(), col_count);
344  CHECK_EQ(null_bitmap_seg.size(), col_count);
345  const auto entry_count = end_entry - start_entry;
346  size_t seg_row_count = 0;
347  for (size_t i = start_entry; i < end_entry; ++i) {
348  auto row = results_->getRowAtNoTranslations(i);
349  if (row.empty()) {
350  continue;
351  }
352  ++seg_row_count;
353  for (size_t j = 0; j < col_count; ++j) {
354  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
355  // TODO(miyu): support more types other than scalar.
356  CHECK(scalar_value);
357  const auto& column = builders[j];
358  switch (column.physical_type) {
359  case kBOOLEAN:
360  create_or_append_value<bool, int64_t>(
361  *scalar_value, value_seg[j], entry_count);
362  create_or_append_validity<int64_t>(
363  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
364  break;
365  case kTINYINT:
366  create_or_append_value<int8_t, int64_t>(
367  *scalar_value, value_seg[j], entry_count);
368  create_or_append_validity<int64_t>(
369  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
370  break;
371  case kSMALLINT:
372  create_or_append_value<int16_t, int64_t>(
373  *scalar_value, value_seg[j], entry_count);
374  create_or_append_validity<int64_t>(
375  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
376  break;
377  case kINT:
378  create_or_append_value<int32_t, int64_t>(
379  *scalar_value, value_seg[j], entry_count);
380  create_or_append_validity<int64_t>(
381  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
382  break;
383  case kBIGINT:
384  create_or_append_value<int64_t, int64_t>(
385  *scalar_value, value_seg[j], entry_count);
386  create_or_append_validity<int64_t>(
387  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
388  break;
389  case kFLOAT:
390  create_or_append_value<float, float>(
391  *scalar_value, value_seg[j], entry_count);
392  create_or_append_validity<float>(
393  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
394  break;
395  case kDOUBLE:
396  create_or_append_value<double, double>(
397  *scalar_value, value_seg[j], entry_count);
398  create_or_append_validity<double>(
399  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
400  break;
401  case kTIME:
402  create_or_append_value<int32_t, int64_t>(
403  *scalar_value, value_seg[j], entry_count);
404  create_or_append_validity<int64_t>(
405  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
406  break;
407  case kDATE:
408  device_type_ == ExecutorDeviceType::GPU
409  ? create_or_append_value<int64_t, int64_t>(
410  *scalar_value, value_seg[j], entry_count)
411  : create_or_append_value<int32_t, int64_t>(
412  *scalar_value, value_seg[j], entry_count);
413  create_or_append_validity<int64_t>(
414  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
415  break;
416  case kTIMESTAMP:
417  create_or_append_value<int64_t, int64_t>(
418  *scalar_value, value_seg[j], entry_count);
419  create_or_append_validity<int64_t>(
420  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
421  break;
422  default:
423  // TODO(miyu): support more scalar types.
424  throw std::runtime_error(column.col_type.get_type_name() +
425  " is not supported in Arrow result sets.");
426  }
427  }
428  }
429  return seg_row_count;
430  };
431 
432  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
433  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
434  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
435  if (multithreaded) {
436  const size_t cpu_count = cpu_threads();
437  std::vector<std::future<size_t>> child_threads;
438  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
439  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
440  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
441  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
442  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
443  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
444  ++i, start_entry += stride) {
445  const auto end_entry = std::min(entry_count, start_entry + stride);
446  child_threads.push_back(std::async(std::launch::async,
447  fetch,
448  std::ref(column_value_segs[i]),
449  std::ref(null_bitmap_segs[i]),
450  start_entry,
451  end_entry));
452  }
453  for (auto& child : child_threads) {
454  row_count += child.get();
455  }
456  for (int i = 0; i < schema->num_fields(); ++i) {
457  reserveColumnBuilderSize(builders[i], row_count);
458  for (size_t j = 0; j < cpu_count; ++j) {
459  if (!column_value_segs[j][i]) {
460  continue;
461  }
462  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
463  }
464  }
465  } else {
466  row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count);
467  for (int i = 0; i < schema->num_fields(); ++i) {
468  reserveColumnBuilderSize(builders[i], row_count);
469  append(builders[i], *column_values[i], null_bitmaps[i]);
470  }
471  }
472 
473  for (size_t i = 0; i < col_count; ++i) {
474  result_columns.push_back(finishColumnBuilder(builders[i]));
475  }
476  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
477 }
478 
479 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
480  const std::string name,
481  const SQLTypeInfo& target_type,
482  const std::shared_ptr<arrow::Array>& dictionary) const {
483  return arrow::field(
484  name, getArrowType(target_type, dictionary), !target_type.get_notnull());
485 }
486 
487 std::shared_ptr<arrow::DataType> ArrowResultSetConverter::getArrowType(
488  const SQLTypeInfo& mapd_type,
489  const std::shared_ptr<arrow::Array>& dict_values) const {
490  switch (get_physical_type(mapd_type)) {
491  case kBOOLEAN:
492  return boolean();
493  case kTINYINT:
494  return int8();
495  case kSMALLINT:
496  return int16();
497  case kINT:
498  return int32();
499  case kBIGINT:
500  return int64();
501  case kFLOAT:
502  return float32();
503  case kDOUBLE:
504  return float64();
505  case kCHAR:
506  case kVARCHAR:
507  case kTEXT:
508  if (mapd_type.is_dict_encoded_string()) {
509  CHECK(dict_values);
510  const auto index_type =
511  getArrowType(get_dict_index_type_info(mapd_type), nullptr);
512  return dictionary(index_type, dict_values);
513  }
514  return utf8();
515  case kDECIMAL:
516  case kNUMERIC:
517  return decimal(mapd_type.get_precision(), mapd_type.get_scale());
518  case kTIME:
519  return time32(TimeUnit::SECOND);
520  case kDATE:
521  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
522  // Currently support for date32() is missing in cuDF.Hence, if client requests for
523  // date on GPU, return date64() for the time being, till support is added.
524  return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
525  case kTIMESTAMP:
526  switch (mapd_type.get_precision()) {
527  case 0:
528  return timestamp(TimeUnit::SECOND);
529  case 3:
530  return timestamp(TimeUnit::MILLI);
531  case 6:
532  return timestamp(TimeUnit::MICRO);
533  case 9:
534  return timestamp(TimeUnit::NANO);
535  default:
536  throw std::runtime_error(
537  "Unsupported timestamp precision for Arrow result sets: " +
538  std::to_string(mapd_type.get_precision()));
539  }
540  case kARRAY:
541  case kINTERVAL_DAY_TIME:
543  default:
544  throw std::runtime_error(mapd_type.get_type_name() +
545  " is not supported in Arrow result sets.");
546  }
547  return nullptr;
548 }
549 
551  const ArrowResult& result,
552  const ExecutorDeviceType device_type,
553  const size_t device_id,
554  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
555  // Remove shared memory on sysmem
556  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
557  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
558  auto shm_id = shmget(schema_key, result.sm_size, 0666);
559  if (shm_id < 0) {
560  throw std::runtime_error(
561  "failed to get an valid shm ID w/ given shm key of the schema");
562  }
563  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
564  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
565  std::to_string(errno) + ")");
566  }
567 
568  if (device_type == ExecutorDeviceType::CPU) {
569  CHECK_EQ(sizeof(key_t), result.df_handle.size());
570  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
571  auto shm_id = shmget(df_key, result.df_size, 0666);
572  if (shm_id < 0) {
573  throw std::runtime_error(
574  "failed to get an valid shm ID w/ given shm key of the data");
575  }
576  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
577  throw std::runtime_error("failed to deallocate Arrow data frame");
578  }
579  return;
580  }
581 
582  CHECK(device_type == ExecutorDeviceType::GPU);
583  if (!result.df_dev_ptr) {
584  throw std::runtime_error("null pointer to data frame on device");
585  }
586 
587  data_mgr->getCudaMgr()->freeDeviceMem(result.df_dev_ptr);
588 }
589 
591  ColumnBuilder& column_builder,
592  const SQLTypeInfo& col_type,
593  const std::shared_ptr<arrow::Field>& field) const {
594  column_builder.field = field;
595  column_builder.col_type = col_type;
596  column_builder.physical_type = col_type.is_dict_encoded_string()
597  ? get_dict_index_type(col_type)
598  : get_physical_type(col_type);
599 
600  auto value_type = field->type();
601  if (value_type->id() == Type::DICTIONARY) {
602  value_type = static_cast<const DictionaryType&>(*value_type).index_type();
603  }
605  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
606 }
607 
609  const size_t row_count) const {
610  ARROW_THROW_NOT_OK(column_builder.builder->Reserve(static_cast<int64_t>(row_count)));
611 }
612 
613 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
614  ColumnBuilder& column_builder) const {
615  std::shared_ptr<Array> values;
616  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
617  if (column_builder.field->type()->id() == Type::DICTIONARY) {
618  return std::make_shared<DictionaryArray>(column_builder.field->type(), values);
619  } else {
620  return values;
621  }
622 }
623 
624 template <typename BuilderType, typename C_TYPE>
626  ColumnBuilder& column_builder,
627  const ValueArray& values,
628  const std::shared_ptr<std::vector<bool>>& is_valid) const {
629  std::vector<C_TYPE> vals = boost::get<std::vector<C_TYPE>>(values);
630 
631  if (scale_epoch_values<BuilderType>()) {
632  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
633  auto scale_values = [&](auto epoch) {
634  return std::is_same<BuilderType, Date32Builder>::value
636  : scale_sec_to_millisec(epoch);
637  };
638  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
639  }
640 
641  auto typed_builder = static_cast<BuilderType*>(column_builder.builder.get());
642  if (column_builder.field->nullable()) {
643  CHECK(is_valid.get());
644  ARROW_THROW_NOT_OK(typed_builder->APPENDVALUES(vals, *is_valid));
645  } else {
646  ARROW_THROW_NOT_OK(typed_builder->APPENDVALUES(vals));
647  }
648 }
649 
651  ColumnBuilder& column_builder,
652  const ValueArray& values,
653  const std::shared_ptr<std::vector<bool>>& is_valid) const {
654  switch (column_builder.physical_type) {
655  case kBOOLEAN:
656  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
657  break;
658  case kTINYINT:
659  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
660  break;
661  case kSMALLINT:
662  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
663  break;
664  case kINT:
665  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
666  break;
667  case kBIGINT:
668  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
669  break;
670  case kFLOAT:
671  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
672  break;
673  case kDOUBLE:
674  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
675  break;
676  case kTIME:
677  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
678  break;
679  case kTIMESTAMP:
680  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
681  break;
682  case kDATE:
683  device_type_ == ExecutorDeviceType::GPU
684  ? appendToColumnBuilder<Date64Builder, int64_t>(
685  column_builder, values, is_valid)
686  : appendToColumnBuilder<Date32Builder, int32_t>(
687  column_builder, values, is_valid);
688  break;
689  case kCHAR:
690  case kVARCHAR:
691  case kTEXT:
692  default:
693  // TODO(miyu): support more scalar types.
694  throw std::runtime_error(column_builder.col_type.get_type_name() +
695  " is not supported in Arrow result sets.");
696  }
697 }
698 
699 // helpers for debugging
700 
701 #ifdef ENABLE_ARROW_DEBUG
702 void print_serialized_schema(const uint8_t* data, const size_t length) {
703  io::BufferReader reader(std::make_shared<arrow::Buffer>(data, length));
704  std::shared_ptr<Schema> schema;
705  ARROW_THROW_NOT_OK(ipc::ReadSchema(&reader, &schema));
706 
707  std::cout << "Arrow Schema: " << std::endl;
708  const PrettyPrintOptions options{0};
709  ARROW_THROW_NOT_OK(PrettyPrint(*(schema.get()), options, &std::cout));
710 }
711 
712 void print_serialized_records(const uint8_t* data,
713  const size_t length,
714  const std::shared_ptr<Schema>& schema) {
715  if (data == nullptr || !length) {
716  std::cout << "No row found" << std::endl;
717  return;
718  }
719  std::shared_ptr<RecordBatch> batch;
720 
721  io::BufferReader buffer_reader(std::make_shared<arrow::Buffer>(data, length));
722  ARROW_THROW_NOT_OK(ipc::ReadRecordBatch(schema, &buffer_reader, &batch));
723 }
724 #endif
bool is_fp() const
Definition: sqltypes.h:481
bool is_boolean() const
Definition: sqltypes.h:484
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::unique_ptr< arrow::ArrayBuilder > builder
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:28
Definition: sqltypes.h:52
SQLTypes
Definition: sqltypes.h:41
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::vector< char > sm_handle
ExecutorDeviceType
SQLTypeInfo get_dict_index_type_info(const SQLTypeInfo &ti)
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
HOST DEVICE int get_scale() const
Definition: sqltypes.h:331
unsigned long long CUdeviceptr
Definition: nocuda.h:27
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type, const std::shared_ptr< arrow::Array > &dictionary) const
#define UNREACHABLE()
Definition: Logger.h:234
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
std::shared_ptr< arrow::Field > field
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::string to_string(char const *&&v)
void reserveColumnBuilderSize(ColumnBuilder &column_builder, const size_t row_count) const
std::string get_type_name() const
Definition: sqltypes.h:429
static constexpr int64_t kMilliSecsPerSec
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
#define ARROW_RECORDBATCH_MAKE
CHECK(cgen_state)
bool is_time() const
Definition: sqltypes.h:483
int8_t * df_dev_ptr
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
std::shared_ptr< arrow::DataType > getArrowType(const SQLTypeInfo &mapd_type, const std::shared_ptr< arrow::Array > &dict_values) const
std::vector< char > df_handle
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:333
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:136
void appendToColumnBuilder(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
Definition: sqltypes.h:55
Definition: sqltypes.h:56
int64_t sm_size
bool is_integer() const
Definition: sqltypes.h:479
int64_t df_size
#define IS_INTEGER(T)
Definition: sqltypes.h:161
Definition: sqltypes.h:44
ArrowResult getArrowResultImpl() const
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
SerializedArrowOutput getSerializedArrowOutput() const
int get_precision() const
Definition: sqltypes.h:329
Definition: sqltypes.h:48
bool is_dict_encoded_string() const
Definition: sqltypes.h:503
int cpu_threads()
Definition: thread_count.h:25
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
std::shared_ptr< arrow::RecordBatch > convertToArrow(arrow::ipc::DictionaryMemo &memo) const
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156