OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 
21 #ifndef _MSC_VER
22 #include <sys/ipc.h>
23 #include <sys/shm.h>
24 #include <sys/types.h>
25 #else
26 // IPC shared memory not yet supported on windows
27 using key_t = size_t;
28 #define IPC_PRIVATE 0
29 #endif
30 
31 #include <algorithm>
32 #include <cerrno>
33 #include <cstdio>
34 #include <cstdlib>
35 #include <future>
36 #include <string>
37 
38 #include "arrow/api.h"
39 #include "arrow/io/memory.h"
40 #include "arrow/ipc/api.h"
41 
42 #include "Shared/ArrowUtil.h"
43 
44 #ifdef HAVE_CUDA
45 #include <arrow/gpu/cuda_api.h>
46 #include <cuda.h>
47 #endif // HAVE_CUDA
48 
49 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
50 
51 using namespace arrow;
52 
53 namespace {
54 
57  switch (ti.get_size()) {
58  case 1:
59  return kTINYINT;
60  case 2:
61  return kSMALLINT;
62  case 4:
63  return kINT;
64  case 8:
65  return kBIGINT;
66  default:
67  CHECK(false);
68  }
69  return ti.get_type();
70 }
71 
73  auto logical_type = ti.get_type();
74  if (IS_INTEGER(logical_type)) {
75  switch (ti.get_size()) {
76  case 1:
77  return kTINYINT;
78  case 2:
79  return kSMALLINT;
80  case 4:
81  return kINT;
82  case 8:
83  return kBIGINT;
84  default:
85  CHECK(false);
86  }
87  }
88  return logical_type;
89 }
90 
91 template <typename TYPE, typename VALUE_ARRAY_TYPE>
93  std::shared_ptr<ValueArray>& values,
94  const size_t max_size) {
95  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
96  CHECK(pval_cty);
97  auto val_ty = static_cast<TYPE>(*pval_cty);
98  if (!values) {
99  values = std::make_shared<ValueArray>(std::vector<TYPE>());
100  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
101  }
102  CHECK(values);
103  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
104  CHECK(values_ty);
105  values_ty->push_back(val_ty);
106 }
107 
108 template <typename TYPE>
110  const SQLTypeInfo& col_type,
111  std::shared_ptr<std::vector<bool>>& null_bitmap,
112  const size_t max_size) {
113  if (col_type.get_notnull()) {
114  CHECK(!null_bitmap);
115  return;
116  }
117  auto pvalue = boost::get<TYPE>(&value);
118  CHECK(pvalue);
119  bool is_valid = false;
120  if (col_type.is_boolean()) {
121  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
122  } else if (col_type.is_dict_encoded_string()) {
123  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
124  } else if (col_type.is_integer() || col_type.is_time()) {
125  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
126  } else if (col_type.is_fp()) {
127  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
128  } else {
129  UNREACHABLE();
130  }
131 
132  if (!null_bitmap) {
133  null_bitmap = std::make_shared<std::vector<bool>>();
134  null_bitmap->reserve(max_size);
135  }
136  CHECK(null_bitmap);
137  null_bitmap->push_back(is_valid);
138 }
139 
140 template <typename TYPE, typename enable = void>
141 class null_type {};
142 
143 template <typename TYPE>
144 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
146  static constexpr type value = inline_int_null_value<type>();
147 };
148 
149 template <typename TYPE>
150 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
151  using type = TYPE;
152  static constexpr type value = inline_fp_null_value<type>();
153 };
154 
155 template <typename TYPE>
157 
158 template <typename C_TYPE, typename ARROW_TYPE = typename CTypeTraits<C_TYPE>::ArrowType>
160  size_t col,
161  std::unique_ptr<int8_t[]>& values,
162  std::unique_ptr<uint8_t[]>& is_valid,
163  size_t entry_count,
164  std::shared_ptr<Array>& out) {
165  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
166  CHECK(!values);
167  CHECK(!is_valid);
168 
169  const int8_t* data_ptr;
170  if (result->isZeroCopyColumnarConversionPossible(col)) {
171  data_ptr = result->getColumnarBuffer(col);
172  } else {
173  values.reset(new int8_t[entry_count * sizeof(C_TYPE)]);
174  result->copyColumnIntoBuffer(col, values.get(), entry_count * sizeof(C_TYPE));
175  data_ptr = values.get();
176  }
177 
178  int64_t null_count = 0;
179  is_valid.reset(new uint8_t[(entry_count + 7) / 8]);
180 
181  const null_type_t<C_TYPE>* vals =
182  reinterpret_cast<const null_type_t<C_TYPE>*>(data_ptr);
184 
185  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
186  for (size_t i = 0; i < unroll_count; i += 8) {
187  uint8_t valid_byte = 0;
188  uint8_t valid;
189  valid = vals[i + 0] != null_val;
190  valid_byte |= valid << 0;
191  null_count += !valid;
192  valid = vals[i + 1] != null_val;
193  valid_byte |= valid << 1;
194  null_count += !valid;
195  valid = vals[i + 2] != null_val;
196  valid_byte |= valid << 2;
197  null_count += !valid;
198  valid = vals[i + 3] != null_val;
199  valid_byte |= valid << 3;
200  null_count += !valid;
201  valid = vals[i + 4] != null_val;
202  valid_byte |= valid << 4;
203  null_count += !valid;
204  valid = vals[i + 5] != null_val;
205  valid_byte |= valid << 5;
206  null_count += !valid;
207  valid = vals[i + 6] != null_val;
208  valid_byte |= valid << 6;
209  null_count += !valid;
210  valid = vals[i + 7] != null_val;
211  valid_byte |= valid << 7;
212  null_count += !valid;
213  is_valid[i >> 3] = valid_byte;
214  }
215  if (unroll_count != entry_count) {
216  uint8_t valid_byte = 0;
217  for (size_t i = unroll_count; i < entry_count; ++i) {
218  bool valid = vals[i] != null_val;
219  valid_byte |= valid << (i & 7);
220  null_count += !valid;
221  }
222  is_valid[unroll_count >> 3] = valid_byte;
223  }
224 
225  if (!null_count) {
226  is_valid.reset();
227  }
228 
229  // TODO: support date/time + scaling
230  // TODO: support booleans
231  std::shared_ptr<Buffer> data(new Buffer(reinterpret_cast<const uint8_t*>(data_ptr),
232  entry_count * sizeof(C_TYPE)));
233  if (null_count) {
234  std::shared_ptr<Buffer> null_bitmap(
235  new Buffer(is_valid.get(), (entry_count + 7) / 8));
236  out.reset(new NumericArray<ARROW_TYPE>(entry_count, data, null_bitmap, null_count));
237  } else {
238  out.reset(new NumericArray<ARROW_TYPE>(entry_count, data));
239  }
240 }
241 
242 #ifndef _MSC_VER
243 std::pair<key_t, void*> get_shm(size_t shmsz) {
244  if (!shmsz) {
245  return std::make_pair(IPC_PRIVATE, nullptr);
246  }
247  // Generate a new key for a shared memory segment. Keys to shared memory segments
248  // are OS global, so we need to try a new key if we encounter a collision. It seems
249  // incremental keygen would be deterministically worst-case. If we use a hash
250  // (like djb2) + nonce, we could still get collisions if multiple clients specify
251  // the same nonce, so using rand() in lieu of a better approach
252  // TODO(ptaylor): Is this common? Are these assumptions true?
253  auto key = static_cast<key_t>(rand());
254  int shmid = -1;
255  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
256  // exist IPC_EXCL - ensures failure if a segment already exists for this key
257  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
258  // If shmget fails and errno is one of these four values, try a new key.
259  // TODO(ptaylor): is checking for the last three values really necessary? Checking
260  // them by default to be safe. EEXIST - a shared memory segment is already associated
261  // with this key EACCES - a shared memory segment is already associated with this key,
262  // but we don't have permission to access it EINVAL - a shared memory segment is
263  // already associated with this key, but the size is less than shmsz ENOENT -
264  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
265  // was found
266  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
267  throw std::runtime_error("failed to create a shared memory");
268  }
269  key = static_cast<key_t>(rand());
270  }
271  // get a pointer to the shared memory segment
272  auto ipc_ptr = shmat(shmid, NULL, 0);
273  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
274  throw std::runtime_error("failed to attach a shared memory");
275  }
276 
277  return std::make_pair(key, ipc_ptr);
278 }
279 #endif
280 
281 std::pair<key_t, std::shared_ptr<Buffer>> get_shm_buffer(size_t size) {
282 #ifdef _MSC_VER
283  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
284  return std::make_pair(0, nullptr);
285 #else
286  auto [key, ipc_ptr] = get_shm(size);
287  std::shared_ptr<Buffer> buffer(new MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
288  return std::make_pair<key_t, std::shared_ptr<Buffer>>(std::move(key),
289  std::move(buffer));
290 #endif
291 }
292 
293 } // namespace
294 
295 namespace arrow {
296 
297 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
298 #ifdef _MSC_VER
299  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
300 #else
301  auto [key, ipc_ptr] = get_shm(data->size());
302  // copy the arrow records buffer to shared memory
303  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
304  // write directly to the shared memory segment as a sink
305  memcpy(ipc_ptr, data->data(), data->size());
306  // detach from the shared memory segment
307  shmdt(ipc_ptr);
308  return key;
309 #endif
310 }
311 
312 } // namespace arrow
313 
318  auto timer = DEBUG_TIMER(__func__);
319  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
320 
321  if (device_type_ == ExecutorDeviceType::CPU ||
322  transport_method_ == ArrowTransport::WIRE) {
323  const auto getWireResult =
324  [&](const int64_t schema_size,
325  const int64_t dict_size,
326  const int64_t records_size,
327  const std::shared_ptr<Buffer>& serialized_schema,
328  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
329  auto timer = DEBUG_TIMER("serialize batch to wire");
330  std::vector<char> schema_handle_data;
331  std::vector<char> record_handle_data;
332  const int64_t total_size = schema_size + records_size + dict_size;
333  record_handle_data.insert(record_handle_data.end(),
334  serialized_schema->data(),
335  serialized_schema->data() + schema_size);
336 
337  record_handle_data.insert(record_handle_data.end(),
338  serialized_dict->data(),
339  serialized_dict->data() + dict_size);
340 
341  record_handle_data.resize(total_size);
342  auto serialized_records =
343  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
344 
345  io::FixedSizeBufferWriter stream(
346  SliceMutableBuffer(serialized_records, schema_size + dict_size));
347  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
348  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
349 
350  return {std::vector<char>(0),
351  0,
352  std::vector<char>(0),
353  serialized_records->size(),
354  std::string{""},
355  record_handle_data};
356  };
357 
358  const auto getShmResult =
359  [&](const int64_t schema_size,
360  const int64_t dict_size,
361  const int64_t records_size,
362  const std::shared_ptr<Buffer>& serialized_schema,
363  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
364  auto timer = DEBUG_TIMER("serialize batch to shared memory");
365  std::shared_ptr<Buffer> serialized_records;
366  std::vector<char> schema_handle_buffer;
367  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
368  key_t records_shm_key = IPC_PRIVATE;
369  const int64_t total_size = schema_size + records_size + dict_size;
370 
371  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
372 
373  memcpy(serialized_records->mutable_data(),
374  serialized_schema->data(),
375  (size_t)schema_size);
376  memcpy(serialized_records->mutable_data() + schema_size,
377  serialized_dict->data(),
378  (size_t)dict_size);
379 
380  io::FixedSizeBufferWriter stream(
381  SliceMutableBuffer(serialized_records, schema_size + dict_size));
382  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
383  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
384  memcpy(&record_handle_buffer[0],
385  reinterpret_cast<const unsigned char*>(&records_shm_key),
386  sizeof(key_t));
387 
388  return {schema_handle_buffer,
389  0,
390  record_handle_buffer,
391  serialized_records->size(),
392  std::string{""}};
393  };
394 
395  std::shared_ptr<Buffer> serialized_schema;
396  int64_t records_size = 0;
397  int64_t schema_size = 0;
398  ipc::DictionaryMemo memo;
399  auto options = ipc::IpcWriteOptions::Defaults();
400  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
401 
402  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &memo));
403  for (auto& pair : memo.dictionaries()) {
404  ipc::IpcPayload payload;
405  int64_t dictionary_id = pair.first;
406  const auto& dictionary = pair.second;
407 
409  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
410  int32_t metadata_length = 0;
412  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
413  }
414  auto serialized_dict = dict_stream->Finish().ValueOrDie();
415  auto dict_size = serialized_dict->size();
416 
418  serialized_schema,
419  ipc::SerializeSchema(*record_batch->schema(), nullptr, default_memory_pool()));
420  schema_size = serialized_schema->size();
421 
422  ARROW_THROW_NOT_OK(ipc::GetRecordBatchSize(*record_batch, &records_size));
423 
424  switch (transport_method_) {
426  return getWireResult(
427  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
429  return getShmResult(
430  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
431  default:
432  UNREACHABLE();
433  }
434  }
435 #ifdef HAVE_CUDA
436  CHECK(device_type_ == ExecutorDeviceType::GPU);
437 
438  // Copy the schema to the schema handle
439  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
440  ARROW_THROW_NOT_OK(out_stream_result.status());
441  auto out_stream = std::move(out_stream_result).ValueOrDie();
442 
443  arrow::ipc::DictionaryMemo current_memo;
444  arrow::ipc::DictionaryMemo serialized_memo;
445 
446  arrow::ipc::IpcPayload schema_payload;
447  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
448  arrow::ipc::IpcWriteOptions::Defaults(),
449  &serialized_memo,
450  &schema_payload));
451  int32_t schema_payload_length = 0;
452  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
453  arrow::ipc::IpcWriteOptions::Defaults(),
454  out_stream.get(),
455  &schema_payload_length));
456 
457  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &current_memo));
458 
459  // now try a dictionary
460  std::shared_ptr<arrow::Schema> dummy_schema;
461  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
462  for (int i = 0; i < record_batch->schema()->num_fields(); i++) {
463  auto field = record_batch->schema()->field(i);
464  if (field->type()->id() == arrow::Type::DICTIONARY) {
465  int64_t dict_id = -1;
466  ARROW_THROW_NOT_OK(current_memo.GetId(field.get(), &dict_id));
467  CHECK_GE(dict_id, 0);
468  std::shared_ptr<Array> dict;
469  ARROW_THROW_NOT_OK(current_memo.GetDictionary(dict_id, &dict));
470  CHECK(dict);
471 
472  if (!dummy_schema) {
473  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
474  dummy_schema = std::make_shared<arrow::Schema>(
475  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
476  }
477  dict_batches.emplace_back(
478  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
479  }
480  }
481 
482  if (!dict_batches.empty()) {
483  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
484  dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
485  }
486 
487  auto complete_ipc_stream = out_stream->Finish();
488  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
489  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
490 
491  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
492  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
493  memcpy(&schema_record_key_buffer[0],
494  reinterpret_cast<const unsigned char*>(&record_key),
495  sizeof(key_t));
496 
497  arrow::cuda::CudaDeviceManager* manager;
498  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
499  std::shared_ptr<arrow::cuda::CudaContext> context;
500  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
501 
502  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
503  ARROW_ASSIGN_OR_THROW(device_serialized,
504  SerializeRecordBatch(*record_batch, context.get()));
505 
506  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
507  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
508 
509  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
510  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
511  cuda_handle->Serialize(arrow::default_memory_pool()));
512 
513  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
514  memcpy(&record_handle_buffer[0],
515  serialized_cuda_handle->data(),
516  serialized_cuda_handle->size());
517 
518  return {schema_record_key_buffer,
519  serialized_records->size(),
520  record_handle_buffer,
521  serialized_cuda_handle->size(),
522  serialized_cuda_handle->ToString()};
523 #else
524  UNREACHABLE();
525  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
526 #endif
527 }
528 
531  arrow::ipc::DictionaryMemo* memo) const {
532  auto timer = DEBUG_TIMER(__func__);
533  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
534  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
535 
536  ARROW_ASSIGN_OR_THROW(serialized_schema,
537  arrow::ipc::SerializeSchema(
538  *arrow_copy->schema(), memo, arrow::default_memory_pool()));
539 
540  ARROW_THROW_NOT_OK(CollectDictionaries(*arrow_copy, memo));
541 
542  if (arrow_copy->num_rows()) {
543  auto timer = DEBUG_TIMER("serialize records");
544  ARROW_THROW_NOT_OK(arrow_copy->Validate());
545  ARROW_ASSIGN_OR_THROW(serialized_records,
546  arrow::ipc::SerializeRecordBatch(
547  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
548  } else {
549  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
550  }
551  return {serialized_schema, serialized_records};
552 }
553 
554 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
555  auto timer = DEBUG_TIMER(__func__);
556  const auto col_count = results_->colCount();
557  std::vector<std::shared_ptr<arrow::Field>> fields;
558  CHECK(col_names_.empty() || col_names_.size() == col_count);
559  for (size_t i = 0; i < col_count; ++i) {
560  const auto ti = results_->getColType(i);
561  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
562  }
563  return getArrowBatch(arrow::schema(fields));
564 }
565 
566 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
567  const std::shared_ptr<arrow::Schema>& schema) const {
568  std::vector<std::shared_ptr<arrow::Array>> result_columns;
569 
570  const size_t entry_count = top_n_ < 0
571  ? results_->entryCount()
572  : std::min(size_t(top_n_), results_->entryCount());
573  if (!entry_count) {
574  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
575  }
576  const auto col_count = results_->colCount();
577  size_t row_count = 0;
578 
579  result_columns.resize(col_count);
580  std::vector<ColumnBuilder> builders(col_count);
581 
582  // Create array builders
583  for (size_t i = 0; i < col_count; ++i) {
584  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
585  }
586 
587  // TODO(miyu): speed up for columnar buffers
588  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
589  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
590  const std::vector<bool>& non_lazy_cols,
591  const size_t start_entry,
592  const size_t end_entry) -> size_t {
593  CHECK_EQ(value_seg.size(), col_count);
594  CHECK_EQ(null_bitmap_seg.size(), col_count);
595  const auto entry_count = end_entry - start_entry;
596  size_t seg_row_count = 0;
597  for (size_t i = start_entry; i < end_entry; ++i) {
598  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
599  if (row.empty()) {
600  continue;
601  }
602  ++seg_row_count;
603  for (size_t j = 0; j < col_count; ++j) {
604  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
605  continue;
606  }
607 
608  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
609  // TODO(miyu): support more types other than scalar.
610  CHECK(scalar_value);
611  const auto& column = builders[j];
612  switch (column.physical_type) {
613  case kBOOLEAN:
614  create_or_append_value<bool, int64_t>(
615  *scalar_value, value_seg[j], entry_count);
616  create_or_append_validity<int64_t>(
617  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
618  break;
619  case kTINYINT:
620  create_or_append_value<int8_t, int64_t>(
621  *scalar_value, value_seg[j], entry_count);
622  create_or_append_validity<int64_t>(
623  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
624  break;
625  case kSMALLINT:
626  create_or_append_value<int16_t, int64_t>(
627  *scalar_value, value_seg[j], entry_count);
628  create_or_append_validity<int64_t>(
629  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
630  break;
631  case kINT:
632  create_or_append_value<int32_t, int64_t>(
633  *scalar_value, value_seg[j], entry_count);
634  create_or_append_validity<int64_t>(
635  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
636  break;
637  case kBIGINT:
638  create_or_append_value<int64_t, int64_t>(
639  *scalar_value, value_seg[j], entry_count);
640  create_or_append_validity<int64_t>(
641  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
642  break;
643  case kFLOAT:
644  create_or_append_value<float, float>(
645  *scalar_value, value_seg[j], entry_count);
646  create_or_append_validity<float>(
647  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
648  break;
649  case kDOUBLE:
650  create_or_append_value<double, double>(
651  *scalar_value, value_seg[j], entry_count);
652  create_or_append_validity<double>(
653  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
654  break;
655  case kTIME:
656  create_or_append_value<int32_t, int64_t>(
657  *scalar_value, value_seg[j], entry_count);
658  create_or_append_validity<int64_t>(
659  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
660  break;
661  case kDATE:
662  device_type_ == ExecutorDeviceType::GPU
663  ? create_or_append_value<int64_t, int64_t>(
664  *scalar_value, value_seg[j], entry_count)
665  : create_or_append_value<int32_t, int64_t>(
666  *scalar_value, value_seg[j], entry_count);
667  create_or_append_validity<int64_t>(
668  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
669  break;
670  case kTIMESTAMP:
671  create_or_append_value<int64_t, int64_t>(
672  *scalar_value, value_seg[j], entry_count);
673  create_or_append_validity<int64_t>(
674  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
675  break;
676  default:
677  // TODO(miyu): support more scalar types.
678  throw std::runtime_error(column.col_type.get_type_name() +
679  " is not supported in Arrow result sets.");
680  }
681  }
682  }
683  return seg_row_count;
684  };
685 
686  auto convert_columns = [&](std::vector<std::unique_ptr<int8_t[]>>& values,
687  std::vector<std::unique_ptr<uint8_t[]>>& is_valid,
688  std::vector<std::shared_ptr<arrow::Array>>& result,
689  const std::vector<bool>& non_lazy_cols,
690  const size_t start_col,
691  const size_t end_col) {
692  for (size_t col = start_col; col < end_col; ++col) {
693  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
694  continue;
695  }
696 
697  const auto& column = builders[col];
698  switch (column.physical_type) {
699  case kTINYINT:
700  convert_column<int8_t>(
701  results_, col, values[col], is_valid[col], entry_count, result[col]);
702  break;
703  case kSMALLINT:
704  convert_column<int16_t>(
705  results_, col, values[col], is_valid[col], entry_count, result[col]);
706  break;
707  case kINT:
708  convert_column<int32_t>(
709  results_, col, values[col], is_valid[col], entry_count, result[col]);
710  break;
711  case kBIGINT:
712  convert_column<int64_t>(
713  results_, col, values[col], is_valid[col], entry_count, result[col]);
714  break;
715  case kFLOAT:
716  convert_column<float>(
717  results_, col, values[col], is_valid[col], entry_count, result[col]);
718  break;
719  case kDOUBLE:
720  convert_column<double>(
721  results_, col, values[col], is_valid[col], entry_count, result[col]);
722  break;
723  default:
724  throw std::runtime_error(column.col_type.get_type_name() +
725  " is not supported in Arrow column converter.");
726  }
727  }
728  };
729 
730  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
731  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
732  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
733  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
734  results_->getQueryMemDesc().getQueryDescriptionType() ==
736  entry_count == results_->entryCount();
737  std::vector<bool> non_lazy_cols;
738  if (use_columnar_converter) {
739  auto timer = DEBUG_TIMER("columnar converter");
740  std::vector<size_t> non_lazy_col_pos;
741  size_t non_lazy_col_count = 0;
742  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
743 
744  non_lazy_cols.reserve(col_count);
745  non_lazy_col_pos.reserve(col_count);
746  for (size_t i = 0; i < col_count; ++i) {
747  bool is_lazy =
748  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
749  // Currently column converter cannot handle some data types.
750  // Treat them as lazy.
751  switch (builders[i].physical_type) {
752  case kBOOLEAN:
753  case kTIME:
754  case kDATE:
755  case kTIMESTAMP:
756  is_lazy = true;
757  break;
758  default:
759  break;
760  }
761  if (builders[i].field->type()->id() == Type::DICTIONARY) {
762  is_lazy = true;
763  }
764  non_lazy_cols.emplace_back(!is_lazy);
765  if (!is_lazy) {
766  ++non_lazy_col_count;
767  non_lazy_col_pos.emplace_back(i);
768  }
769  }
770 
771  if (non_lazy_col_count == col_count) {
772  non_lazy_cols.clear();
773  non_lazy_col_pos.clear();
774  } else {
775  non_lazy_col_pos.emplace_back(col_count);
776  }
777 
778  values_.resize(col_count);
779  is_valid_.resize(col_count);
780  std::vector<std::future<void>> child_threads;
781  size_t num_threads =
782  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
783 
784  size_t start_col = 0;
785  size_t end_col = 0;
786  for (size_t i = 0; i < num_threads; ++i) {
787  start_col = end_col;
788  end_col = (i + 1) * non_lazy_col_count / num_threads;
789  size_t phys_start_col =
790  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
791  size_t phys_end_col =
792  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
793  child_threads.push_back(std::async(std::launch::async,
794  convert_columns,
795  std::ref(values_),
796  std::ref(is_valid_),
797  std::ref(result_columns),
798  non_lazy_cols,
799  phys_start_col,
800  phys_end_col));
801  }
802  for (auto& child : child_threads) {
803  child.get();
804  }
805  row_count = entry_count;
806  }
807  if (!use_columnar_converter || !non_lazy_cols.empty()) {
808  auto timer = DEBUG_TIMER("row converter");
809  row_count = 0;
810  if (multithreaded) {
811  const size_t cpu_count = cpu_threads();
812  std::vector<std::future<size_t>> child_threads;
813  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
814  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
815  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
816  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
817  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
818  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
819  ++i, start_entry += stride) {
820  const auto end_entry = std::min(entry_count, start_entry + stride);
821  child_threads.push_back(std::async(std::launch::async,
822  fetch,
823  std::ref(column_value_segs[i]),
824  std::ref(null_bitmap_segs[i]),
825  non_lazy_cols,
826  start_entry,
827  end_entry));
828  }
829  for (auto& child : child_threads) {
830  row_count += child.get();
831  }
832  {
833  auto timer = DEBUG_TIMER("append rows to arrow");
834  for (int i = 0; i < schema->num_fields(); ++i) {
835  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
836  continue;
837  }
838 
839  for (size_t j = 0; j < cpu_count; ++j) {
840  if (!column_value_segs[j][i]) {
841  continue;
842  }
843  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
844  }
845  }
846  }
847  } else {
848  row_count =
849  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
850  {
851  auto timer = DEBUG_TIMER("append rows to arrow single thread");
852  for (int i = 0; i < schema->num_fields(); ++i) {
853  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
854  continue;
855  }
856 
857  append(builders[i], *column_values[i], null_bitmaps[i]);
858  }
859  }
860  }
861 
862  {
863  auto timer = DEBUG_TIMER("finish builders");
864  for (size_t i = 0; i < col_count; ++i) {
865  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
866  continue;
867  }
868 
869  result_columns[i] = finishColumnBuilder(builders[i]);
870  }
871  }
872  }
873 
874  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
875 }
876 
877 namespace {
878 
879 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
880  const ExecutorDeviceType device_type) {
881  switch (get_physical_type(sql_type)) {
882  case kBOOLEAN:
883  return arrow::boolean();
884  case kTINYINT:
885  return arrow::int8();
886  case kSMALLINT:
887  return arrow::int16();
888  case kINT:
889  return arrow::int32();
890  case kBIGINT:
891  return arrow::int64();
892  case kFLOAT:
893  return arrow::float32();
894  case kDOUBLE:
895  return arrow::float64();
896  case kCHAR:
897  case kVARCHAR:
898  case kTEXT:
899  if (sql_type.is_dict_encoded_string()) {
900  auto value_type = std::make_shared<StringType>();
901  return dictionary(int32(), value_type, false);
902  }
903  return utf8();
904  case kDECIMAL:
905  case kNUMERIC:
906  return decimal(sql_type.get_precision(), sql_type.get_scale());
907  case kTIME:
908  return time32(TimeUnit::SECOND);
909  case kDATE:
910  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
911  // Currently support for date32() is missing in cuDF.Hence, if client requests for
912  // date on GPU, return date64() for the time being, till support is added.
913  return device_type == ExecutorDeviceType::GPU ? date64() : date32();
914  case kTIMESTAMP:
915  switch (sql_type.get_precision()) {
916  case 0:
917  return timestamp(TimeUnit::SECOND);
918  case 3:
919  return timestamp(TimeUnit::MILLI);
920  case 6:
921  return timestamp(TimeUnit::MICRO);
922  case 9:
923  return timestamp(TimeUnit::NANO);
924  default:
925  throw std::runtime_error(
926  "Unsupported timestamp precision for Arrow result sets: " +
927  std::to_string(sql_type.get_precision()));
928  }
929  case kARRAY:
930  case kINTERVAL_DAY_TIME:
932  default:
933  throw std::runtime_error(sql_type.get_type_name() +
934  " is not supported in Arrow result sets.");
935  }
936  return nullptr;
937 }
938 
939 } // namespace
940 
941 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
942  const std::string name,
943  const SQLTypeInfo& target_type) const {
944  return arrow::field(
945  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
946 }
947 
949  const ArrowResult& result,
950  const ExecutorDeviceType device_type,
951  const size_t device_id,
952  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
953 #ifndef _MSC_VER
954  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
955  // Remove shared memory on sysmem
956  if (!result.sm_handle.empty()) {
957  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
958  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
959  auto shm_id = shmget(schema_key, result.sm_size, 0666);
960  if (shm_id < 0) {
961  throw std::runtime_error(
962  "failed to get an valid shm ID w/ given shm key of the schema");
963  }
964  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
965  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
966  std::to_string(errno) + ")");
967  }
968  }
969 
970  if (device_type == ExecutorDeviceType::CPU) {
971  CHECK_EQ(sizeof(key_t), result.df_handle.size());
972  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
973  auto shm_id = shmget(df_key, result.df_size, 0666);
974  if (shm_id < 0) {
975  throw std::runtime_error(
976  "failed to get an valid shm ID w/ given shm key of the data");
977  }
978  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
979  throw std::runtime_error("failed to deallocate Arrow data frame");
980  }
981  }
982  // CUDA buffers become owned by the caller, and will automatically be freed
983  // TODO: What if the client never takes ownership of the result? we may want to
984  // establish a check to see if the GPU buffer still exists, and then free it.
985 #endif
986 }
987 
989  ColumnBuilder& column_builder,
990  const SQLTypeInfo& col_type,
991  const std::shared_ptr<arrow::Field>& field) const {
992  column_builder.field = field;
993  column_builder.col_type = col_type;
994  column_builder.physical_type = col_type.is_dict_encoded_string()
995  ? get_dict_index_type(col_type)
996  : get_physical_type(col_type);
997 
998  auto value_type = field->type();
999  if (col_type.is_dict_encoded_string()) {
1000  column_builder.builder.reset(new StringDictionary32Builder());
1001  // add values to the builder
1002  const int dict_id = col_type.get_comp_param();
1003  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1004 
1005  arrow::StringBuilder str_array_builder;
1006  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1007  std::shared_ptr<StringArray> string_array;
1008  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1009 
1010  auto dict_builder =
1011  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1012  CHECK(dict_builder);
1013 
1014  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1015  } else {
1017  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
1018  }
1019 }
1020 
1021 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1022  ColumnBuilder& column_builder) const {
1023  std::shared_ptr<Array> values;
1024  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1025  return values;
1026 }
1027 
1028 namespace {
1029 
1030 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1032  const ValueArray& values,
1033  const std::shared_ptr<std::vector<bool>>& is_valid) {
1034  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1035  "Dictionary encoded string builder requires function specialization.");
1036 
1037  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1038 
1039  if (scale_epoch_values<BUILDER_TYPE>()) {
1040  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1041  auto scale_values = [&](auto epoch) {
1042  return std::is_same<BUILDER_TYPE, Date32Builder>::value
1044  : scale_sec_to_millisec(epoch);
1045  };
1046  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1047  }
1048 
1049  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1050  CHECK(typed_builder);
1051  if (column_builder.field->nullable()) {
1052  CHECK(is_valid.get());
1053  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1054  } else {
1055  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1056  }
1057 }
1058 
1059 template <>
1060 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1062  const ValueArray& values,
1063  const std::shared_ptr<std::vector<bool>>& is_valid) {
1064  auto typed_builder =
1065  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1066  CHECK(typed_builder);
1067 
1068  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1069 
1070  if (column_builder.field->nullable()) {
1071  CHECK(is_valid.get());
1072  // TODO(adb): Generate this instead of the boolean bitmap
1073  std::vector<uint8_t> transformed_bitmap;
1074  transformed_bitmap.reserve(is_valid->size());
1075  std::for_each(
1076  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1077  transformed_bitmap.push_back(is_valid ? 1 : 0);
1078  });
1079 
1080  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1081  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1082  } else {
1084  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1085  }
1086 }
1087 
1088 } // namespace
1089 
1091  ColumnBuilder& column_builder,
1092  const ValueArray& values,
1093  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1094  if (column_builder.col_type.is_dict_encoded_string()) {
1095  CHECK_EQ(column_builder.physical_type,
1096  kINT); // assume all dicts use none-encoded type for now
1097  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
1098  column_builder, values, is_valid);
1099  return;
1100  }
1101  switch (column_builder.physical_type) {
1102  case kBOOLEAN:
1103  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
1104  break;
1105  case kTINYINT:
1106  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
1107  break;
1108  case kSMALLINT:
1109  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
1110  break;
1111  case kINT:
1112  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
1113  break;
1114  case kBIGINT:
1115  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
1116  break;
1117  case kFLOAT:
1118  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
1119  break;
1120  case kDOUBLE:
1121  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
1122  break;
1123  case kTIME:
1124  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
1125  break;
1126  case kTIMESTAMP:
1127  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
1128  break;
1129  case kDATE:
1130  device_type_ == ExecutorDeviceType::GPU
1131  ? appendToColumnBuilder<Date64Builder, int64_t>(
1132  column_builder, values, is_valid)
1133  : appendToColumnBuilder<Date32Builder, int32_t>(
1134  column_builder, values, is_valid);
1135  break;
1136  case kCHAR:
1137  case kVARCHAR:
1138  case kTEXT:
1139  default:
1140  // TODO(miyu): support more scalar types.
1141  throw std::runtime_error(column_builder.col_type.get_type_name() +
1142  " is not supported in Arrow result sets.");
1143  }
1144 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:48
SQLTypes
Definition: sqltypes.h:37
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, std::unique_ptr< int8_t[]> &values, std::unique_ptr< uint8_t[]> &is_valid, size_t entry_count, std::shared_ptr< Array > &out)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
bool is_fp() const
Definition: sqltypes.h:482
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< arrow::Field > field
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
std::shared_ptr< ResultSet > ResultSetPtr
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:484
std::string to_string(char const *&&v)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
#define ARROW_RECORDBATCH_MAKE
bool is_integer() const
Definition: sqltypes.h:480
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:141
bool is_boolean() const
Definition: sqltypes.h:485
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryMemo *memo) const
int get_precision() const
Definition: sqltypes.h:314
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:51
Definition: sqltypes.h:52
int64_t sm_size
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:414
#define IS_INTEGER(T)
Definition: sqltypes.h:236
Definition: sqltypes.h:40
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
bool is_dict_encoded_string() const
Definition: sqltypes.h:512
Definition: sqltypes.h:44
parquet::Type::type get_physical_type(std::unique_ptr< parquet::arrow::FileReader > &reader, const int logical_column_index)
string name
Definition: setup.py:35
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:318
int cpu_threads()
Definition: thread_count.h:24
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
std::shared_ptr< arrow::RecordBatch > convertToArrow() const