OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "../Shared/DateConverters.h"
18 #include "ArrowResultSet.h"
19 #include "Execute.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
22 
23 #ifndef _MSC_VER
24 #include <sys/ipc.h>
25 #include <sys/shm.h>
26 #include <sys/types.h>
27 #else
28 // IPC shared memory not yet supported on windows
29 using key_t = size_t;
30 #define IPC_PRIVATE 0
31 #endif
32 
33 #include <algorithm>
34 #include <cerrno>
35 #include <cstdio>
36 #include <cstdlib>
37 #include <future>
38 #include <string>
39 
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
43 
44 #include "Shared/ArrowUtil.h"
45 
46 #ifdef HAVE_CUDA
47 #include <arrow/gpu/cuda_api.h>
48 #include <cuda.h>
49 #endif // HAVE_CUDA
50 
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
52 
53 #define ARROW_CONVERTER_DEBUG true
54 
55 #define ARROW_LOG(category) \
56  VLOG(1) << "[Arrow]" \
57  << "[" << category "] "
58 
59 using namespace arrow;
60 
61 namespace {
62 
63 /* We can create Arrow buffers which refer memory owned by ResultSet.
64  For safe memory access we should keep a ResultSetPtr to keep
65  data live while buffer lives. Use this custom buffer for that. */
66 class ResultSetBuffer : public Buffer {
67  public:
68  ResultSetBuffer(const uint8_t* buf, size_t size, ResultSetPtr rs)
69  : Buffer(buf, size), _rs(rs) {}
70 
71  private:
73 };
74 
77  switch (ti.get_size()) {
78  case 1:
79  return kTINYINT;
80  case 2:
81  return kSMALLINT;
82  case 4:
83  return kINT;
84  case 8:
85  return kBIGINT;
86  default:
87  CHECK(false);
88  }
89  return ti.get_type();
90 }
91 
93  auto logical_type = ti.get_type();
94  if (IS_INTEGER(logical_type)) {
95  switch (ti.get_size()) {
96  case 1:
97  return kTINYINT;
98  case 2:
99  return kSMALLINT;
100  case 4:
101  return kINT;
102  case 8:
103  return kBIGINT;
104  default:
105  CHECK(false);
106  }
107  }
108  return logical_type;
109 }
110 
111 template <typename TYPE, typename VALUE_ARRAY_TYPE>
113  std::shared_ptr<ValueArray>& values,
114  const size_t max_size) {
115  auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
116  CHECK(pval_cty);
117  auto val_ty = static_cast<TYPE>(*pval_cty);
118  if (!values) {
119  values = std::make_shared<ValueArray>(std::vector<TYPE>());
120  boost::get<std::vector<TYPE>>(*values).reserve(max_size);
121  }
122  CHECK(values);
123  auto values_ty = boost::get<std::vector<TYPE>>(values.get());
124  CHECK(values_ty);
125  values_ty->push_back(val_ty);
126 }
127 
128 template <typename TYPE>
130  const SQLTypeInfo& col_type,
131  std::shared_ptr<std::vector<bool>>& null_bitmap,
132  const size_t max_size) {
133  if (col_type.get_notnull()) {
134  CHECK(!null_bitmap);
135  return;
136  }
137  auto pvalue = boost::get<TYPE>(&value);
138  CHECK(pvalue);
139  bool is_valid = false;
140  if (col_type.is_boolean()) {
141  is_valid = inline_int_null_val(col_type) != static_cast<int8_t>(*pvalue);
142  } else if (col_type.is_dict_encoded_string()) {
143  is_valid = inline_int_null_val(col_type) != static_cast<int32_t>(*pvalue);
144  } else if (col_type.is_integer() || col_type.is_time()) {
145  is_valid = inline_int_null_val(col_type) != static_cast<int64_t>(*pvalue);
146  } else if (col_type.is_fp()) {
147  is_valid = inline_fp_null_val(col_type) != static_cast<double>(*pvalue);
148  } else {
149  UNREACHABLE();
150  }
151 
152  if (!null_bitmap) {
153  null_bitmap = std::make_shared<std::vector<bool>>();
154  null_bitmap->reserve(max_size);
155  }
156  CHECK(null_bitmap);
157  null_bitmap->push_back(is_valid);
158 }
159 
160 template <typename TYPE, typename enable = void>
161 class null_type {};
162 
163 template <typename TYPE>
164 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
166  static constexpr type value = inline_int_null_value<type>();
167 };
168 
169 template <typename TYPE>
170 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
171  using type = TYPE;
172  static constexpr type value = inline_fp_null_value<type>();
173 };
174 
175 template <typename TYPE>
177 
178 template <typename C_TYPE, typename ARROW_TYPE = typename CTypeTraits<C_TYPE>::ArrowType>
180  size_t col,
181  size_t entry_count,
182  std::shared_ptr<Array>& out) {
183  CHECK(sizeof(C_TYPE) == result->getColType(col).get_size());
184 
185  std::shared_ptr<arrow::Buffer> values;
186  std::shared_ptr<arrow::Buffer> is_valid;
187  const int64_t buf_size = entry_count * sizeof(C_TYPE);
188  if (result->isZeroCopyColumnarConversionPossible(col)) {
189  values.reset(new ResultSetBuffer(
190  reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
191  buf_size,
192  result));
193  } else {
194  auto res = arrow::AllocateBuffer(buf_size);
195  CHECK(res.ok());
196  values = std::move(res).ValueOrDie();
197  result->copyColumnIntoBuffer(
198  col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
199  }
200 
201  int64_t null_count = 0;
202  auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
203  CHECK(res.ok());
204  is_valid = std::move(res).ValueOrDie();
205 
206  auto is_valid_data = is_valid->mutable_data();
207 
208  const null_type_t<C_TYPE>* vals =
209  reinterpret_cast<const null_type_t<C_TYPE>*>(values->data());
211 
212  size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
213  for (size_t i = 0; i < unroll_count; i += 8) {
214  uint8_t valid_byte = 0;
215  uint8_t valid;
216  valid = vals[i + 0] != null_val;
217  valid_byte |= valid << 0;
218  null_count += !valid;
219  valid = vals[i + 1] != null_val;
220  valid_byte |= valid << 1;
221  null_count += !valid;
222  valid = vals[i + 2] != null_val;
223  valid_byte |= valid << 2;
224  null_count += !valid;
225  valid = vals[i + 3] != null_val;
226  valid_byte |= valid << 3;
227  null_count += !valid;
228  valid = vals[i + 4] != null_val;
229  valid_byte |= valid << 4;
230  null_count += !valid;
231  valid = vals[i + 5] != null_val;
232  valid_byte |= valid << 5;
233  null_count += !valid;
234  valid = vals[i + 6] != null_val;
235  valid_byte |= valid << 6;
236  null_count += !valid;
237  valid = vals[i + 7] != null_val;
238  valid_byte |= valid << 7;
239  null_count += !valid;
240  is_valid_data[i >> 3] = valid_byte;
241  }
242  if (unroll_count != entry_count) {
243  uint8_t valid_byte = 0;
244  for (size_t i = unroll_count; i < entry_count; ++i) {
245  bool valid = vals[i] != null_val;
246  valid_byte |= valid << (i & 7);
247  null_count += !valid;
248  }
249  is_valid_data[unroll_count >> 3] = valid_byte;
250  }
251 
252  if (!null_count) {
253  is_valid.reset();
254  }
255 
256  // TODO: support date/time + scaling
257  // TODO: support booleans
258  if (null_count) {
259  out.reset(new NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
260  } else {
261  out.reset(new NumericArray<ARROW_TYPE>(entry_count, values));
262  }
263 }
264 
265 #ifndef _MSC_VER
266 std::pair<key_t, void*> get_shm(size_t shmsz) {
267  if (!shmsz) {
268  return std::make_pair(IPC_PRIVATE, nullptr);
269  }
270  // Generate a new key for a shared memory segment. Keys to shared memory segments
271  // are OS global, so we need to try a new key if we encounter a collision. It seems
272  // incremental keygen would be deterministically worst-case. If we use a hash
273  // (like djb2) + nonce, we could still get collisions if multiple clients specify
274  // the same nonce, so using rand() in lieu of a better approach
275  // TODO(ptaylor): Is this common? Are these assumptions true?
276  auto key = static_cast<key_t>(rand());
277  int shmid = -1;
278  // IPC_CREAT - indicates we want to create a new segment for this key if it doesn't
279  // exist IPC_EXCL - ensures failure if a segment already exists for this key
280  while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
281  // If shmget fails and errno is one of these four values, try a new key.
282  // TODO(ptaylor): is checking for the last three values really necessary? Checking
283  // them by default to be safe. EEXIST - a shared memory segment is already associated
284  // with this key EACCES - a shared memory segment is already associated with this key,
285  // but we don't have permission to access it EINVAL - a shared memory segment is
286  // already associated with this key, but the size is less than shmsz ENOENT -
287  // IPC_CREAT was not set in shmflg and no shared memory segment associated with key
288  // was found
289  if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
290  throw std::runtime_error("failed to create a shared memory");
291  }
292  key = static_cast<key_t>(rand());
293  }
294  // get a pointer to the shared memory segment
295  auto ipc_ptr = shmat(shmid, NULL, 0);
296  if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
297  throw std::runtime_error("failed to attach a shared memory");
298  }
299 
300  return std::make_pair(key, ipc_ptr);
301 }
302 #endif
303 
304 std::pair<key_t, std::shared_ptr<Buffer>> get_shm_buffer(size_t size) {
305 #ifdef _MSC_VER
306  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
307  return std::make_pair(0, nullptr);
308 #else
309  auto [key, ipc_ptr] = get_shm(size);
310  std::shared_ptr<Buffer> buffer(new MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
311  return std::make_pair<key_t, std::shared_ptr<Buffer>>(std::move(key),
312  std::move(buffer));
313 #endif
314 }
315 
316 } // namespace
317 
318 namespace arrow {
319 
320 key_t get_and_copy_to_shm(const std::shared_ptr<Buffer>& data) {
321 #ifdef _MSC_VER
322  throw std::runtime_error("Arrow IPC not yet supported on Windows.");
323 #else
324  auto [key, ipc_ptr] = get_shm(data->size());
325  // copy the arrow records buffer to shared memory
326  // TODO(ptaylor): I'm sure it's possible to tell Arrow's RecordBatchStreamWriter to
327  // write directly to the shared memory segment as a sink
328  memcpy(ipc_ptr, data->data(), data->size());
329  // detach from the shared memory segment
330  shmdt(ipc_ptr);
331  return key;
332 #endif
333 }
334 
335 } // namespace arrow
336 
341  auto timer = DEBUG_TIMER(__func__);
342  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
343 
344  if (device_type_ == ExecutorDeviceType::CPU ||
345  transport_method_ == ArrowTransport::WIRE) {
346  const auto getWireResult =
347  [&](const int64_t schema_size,
348  const int64_t dict_size,
349  const int64_t records_size,
350  const std::shared_ptr<Buffer>& serialized_schema,
351  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
352  auto timer = DEBUG_TIMER("serialize batch to wire");
353  const int64_t total_size = schema_size + records_size + dict_size;
354  std::vector<char> record_handle_data(total_size);
355  auto serialized_records =
356  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
357 
358  ARROW_ASSIGN_OR_THROW(auto writer, Buffer::GetWriter(serialized_records));
359 
360  ARROW_THROW_NOT_OK(writer->Write(
361  reinterpret_cast<const uint8_t*>(serialized_schema->data()), schema_size));
362 
363  ARROW_THROW_NOT_OK(writer->Write(
364  reinterpret_cast<const uint8_t*>(serialized_dict->data()), dict_size));
365 
366  io::FixedSizeBufferWriter stream(
367  SliceMutableBuffer(serialized_records, schema_size + dict_size));
368 
369  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
370  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
371 
372  return {std::vector<char>(0),
373  0,
374  std::vector<char>(0),
375  serialized_records->size(),
376  std::string{""},
377  std::move(record_handle_data)};
378  };
379 
380  const auto getShmResult =
381  [&](const int64_t schema_size,
382  const int64_t dict_size,
383  const int64_t records_size,
384  const std::shared_ptr<Buffer>& serialized_schema,
385  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
386  auto timer = DEBUG_TIMER("serialize batch to shared memory");
387  std::shared_ptr<Buffer> serialized_records;
388  std::vector<char> schema_handle_buffer;
389  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
390  key_t records_shm_key = IPC_PRIVATE;
391  const int64_t total_size = schema_size + records_size + dict_size;
392 
393  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
394 
395  memcpy(serialized_records->mutable_data(),
396  serialized_schema->data(),
397  (size_t)schema_size);
398  memcpy(serialized_records->mutable_data() + schema_size,
399  serialized_dict->data(),
400  (size_t)dict_size);
401 
402  io::FixedSizeBufferWriter stream(
403  SliceMutableBuffer(serialized_records, schema_size + dict_size));
404  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
405  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
406  memcpy(&record_handle_buffer[0],
407  reinterpret_cast<const unsigned char*>(&records_shm_key),
408  sizeof(key_t));
409 
410  return {schema_handle_buffer,
411  0,
412  record_handle_buffer,
413  serialized_records->size(),
414  std::string{""}};
415  };
416 
417  std::shared_ptr<Buffer> serialized_schema;
418  int64_t records_size = 0;
419  int64_t schema_size = 0;
420  ipc::DictionaryFieldMapper mapper(*record_batch->schema());
421  auto options = ipc::IpcWriteOptions::Defaults();
422  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
423 
424  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
425 
426  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
427 
428  for (auto& pair : dictionaries) {
429  ipc::IpcPayload payload;
430  int64_t dictionary_id = pair.first;
431  const auto& dictionary = pair.second;
432 
434  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
435  int32_t metadata_length = 0;
437  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
438  }
439  auto serialized_dict = dict_stream->Finish().ValueOrDie();
440  auto dict_size = serialized_dict->size();
441 
443  serialized_schema,
444  ipc::SerializeSchema(*record_batch->schema(), default_memory_pool()));
445  schema_size = serialized_schema->size();
446 
447  ARROW_THROW_NOT_OK(ipc::GetRecordBatchSize(*record_batch, &records_size));
448 
449  switch (transport_method_) {
451  return getWireResult(
452  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
454  return getShmResult(
455  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
456  default:
457  UNREACHABLE();
458  }
459  }
460 #ifdef HAVE_CUDA
461  CHECK(device_type_ == ExecutorDeviceType::GPU);
462 
463  // Copy the schema to the schema handle
464  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
465  ARROW_THROW_NOT_OK(out_stream_result.status());
466  auto out_stream = std::move(out_stream_result).ValueOrDie();
467 
468  ipc::DictionaryFieldMapper mapper(*record_batch->schema());
469  arrow::ipc::DictionaryMemo current_memo;
470  arrow::ipc::DictionaryMemo serialized_memo;
471 
472  arrow::ipc::IpcPayload schema_payload;
473  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
474  arrow::ipc::IpcWriteOptions::Defaults(),
475  mapper,
476  &schema_payload));
477  int32_t schema_payload_length = 0;
478  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
479  arrow::ipc::IpcWriteOptions::Defaults(),
480  out_stream.get(),
481  &schema_payload_length));
482  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
483  ARROW_LOG("GPU") << "Dictionary "
484  << "found dicts: " << dictionaries.size();
485 
487  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
488 
489  // now try a dictionary
490  std::shared_ptr<arrow::Schema> dummy_schema;
491  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
492 
493  for (const auto& pair : dictionaries) {
494  ipc::IpcPayload payload;
495  const auto& dict_id = pair.first;
496  CHECK_GE(dict_id, 0);
497  ARROW_LOG("GPU") << "Dictionary "
498  << "dict_id: " << dict_id;
499  const auto& dict = pair.second;
500  CHECK(dict);
501 
502  if (!dummy_schema) {
503  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
504  dummy_schema = std::make_shared<arrow::Schema>(
505  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
506  }
507  dict_batches.emplace_back(
508  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
509  }
510 
511  if (!dict_batches.empty()) {
512  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
513  dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
514  }
515 
516  auto complete_ipc_stream = out_stream->Finish();
517  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
518  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
519 
520  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
521  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
522  memcpy(&schema_record_key_buffer[0],
523  reinterpret_cast<const unsigned char*>(&record_key),
524  sizeof(key_t));
525 
526  arrow::cuda::CudaDeviceManager* manager;
527  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
528  std::shared_ptr<arrow::cuda::CudaContext> context;
529  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
530 
531  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
532  ARROW_ASSIGN_OR_THROW(device_serialized,
533  SerializeRecordBatch(*record_batch, context.get()));
534 
535  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
536  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
537 
538  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
539  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
540  cuda_handle->Serialize(arrow::default_memory_pool()));
541 
542  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
543  memcpy(&record_handle_buffer[0],
544  serialized_cuda_handle->data(),
545  serialized_cuda_handle->size());
546 
547  return {schema_record_key_buffer,
548  serialized_records->size(),
549  record_handle_buffer,
550  serialized_cuda_handle->size(),
551  serialized_cuda_handle->ToString()};
552 #else
553  UNREACHABLE();
554  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
555 #endif
556 }
557 
560  arrow::ipc::DictionaryFieldMapper* mapper) const {
561  auto timer = DEBUG_TIMER(__func__);
562  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
563  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
564 
566  serialized_schema,
567  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
568 
569  if (arrow_copy->num_rows()) {
570  auto timer = DEBUG_TIMER("serialize records");
571  ARROW_THROW_NOT_OK(arrow_copy->Validate());
572  ARROW_ASSIGN_OR_THROW(serialized_records,
573  arrow::ipc::SerializeRecordBatch(
574  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
575  } else {
576  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
577  }
578  return {serialized_schema, serialized_records};
579 }
580 
581 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::convertToArrow() const {
582  auto timer = DEBUG_TIMER(__func__);
583  const auto col_count = results_->colCount();
584  std::vector<std::shared_ptr<arrow::Field>> fields;
585  CHECK(col_names_.empty() || col_names_.size() == col_count);
586  for (size_t i = 0; i < col_count; ++i) {
587  const auto ti = results_->getColType(i);
588  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
589  }
590 #if ARROW_CONVERTER_DEBUG
591  VLOG(1) << "Arrow fields: ";
592  for (const auto& f : fields) {
593  VLOG(1) << "\t" << f->ToString(true);
594  }
595 #endif
596  return getArrowBatch(arrow::schema(fields));
597 }
598 
599 std::shared_ptr<arrow::RecordBatch> ArrowResultSetConverter::getArrowBatch(
600  const std::shared_ptr<arrow::Schema>& schema) const {
601  std::vector<std::shared_ptr<arrow::Array>> result_columns;
602 
603  const size_t entry_count = top_n_ < 0
604  ? results_->entryCount()
605  : std::min(size_t(top_n_), results_->entryCount());
606  if (!entry_count) {
607  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
608  }
609  const auto col_count = results_->colCount();
610  size_t row_count = 0;
611 
612  result_columns.resize(col_count);
613  std::vector<ColumnBuilder> builders(col_count);
614 
615  // Create array builders
616  for (size_t i = 0; i < col_count; ++i) {
617  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
618  }
619 
620  // TODO(miyu): speed up for columnar buffers
621  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
622  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
623  const std::vector<bool>& non_lazy_cols,
624  const size_t start_entry,
625  const size_t end_entry) -> size_t {
626  CHECK_EQ(value_seg.size(), col_count);
627  CHECK_EQ(null_bitmap_seg.size(), col_count);
628  const auto entry_count = end_entry - start_entry;
629  size_t seg_row_count = 0;
630  for (size_t i = start_entry; i < end_entry; ++i) {
631  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
632  if (row.empty()) {
633  continue;
634  }
635  ++seg_row_count;
636  for (size_t j = 0; j < col_count; ++j) {
637  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
638  continue;
639  }
640 
641  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
642  // TODO(miyu): support more types other than scalar.
643  CHECK(scalar_value);
644  const auto& column = builders[j];
645  switch (column.physical_type) {
646  case kBOOLEAN:
647  create_or_append_value<bool, int64_t>(
648  *scalar_value, value_seg[j], entry_count);
649  create_or_append_validity<int64_t>(
650  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
651  break;
652  case kTINYINT:
653  create_or_append_value<int8_t, int64_t>(
654  *scalar_value, value_seg[j], entry_count);
655  create_or_append_validity<int64_t>(
656  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
657  break;
658  case kSMALLINT:
659  create_or_append_value<int16_t, int64_t>(
660  *scalar_value, value_seg[j], entry_count);
661  create_or_append_validity<int64_t>(
662  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
663  break;
664  case kINT:
665  create_or_append_value<int32_t, int64_t>(
666  *scalar_value, value_seg[j], entry_count);
667  create_or_append_validity<int64_t>(
668  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
669  break;
670  case kBIGINT:
671  create_or_append_value<int64_t, int64_t>(
672  *scalar_value, value_seg[j], entry_count);
673  create_or_append_validity<int64_t>(
674  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
675  break;
676  case kFLOAT:
677  create_or_append_value<float, float>(
678  *scalar_value, value_seg[j], entry_count);
679  create_or_append_validity<float>(
680  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
681  break;
682  case kDOUBLE:
683  create_or_append_value<double, double>(
684  *scalar_value, value_seg[j], entry_count);
685  create_or_append_validity<double>(
686  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
687  break;
688  case kTIME:
689  create_or_append_value<int32_t, int64_t>(
690  *scalar_value, value_seg[j], entry_count);
691  create_or_append_validity<int64_t>(
692  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
693  break;
694  case kDATE:
695  device_type_ == ExecutorDeviceType::GPU
696  ? create_or_append_value<int64_t, int64_t>(
697  *scalar_value, value_seg[j], entry_count)
698  : create_or_append_value<int32_t, int64_t>(
699  *scalar_value, value_seg[j], entry_count);
700  create_or_append_validity<int64_t>(
701  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
702  break;
703  case kTIMESTAMP:
704  create_or_append_value<int64_t, int64_t>(
705  *scalar_value, value_seg[j], entry_count);
706  create_or_append_validity<int64_t>(
707  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
708  break;
709  default:
710  // TODO(miyu): support more scalar types.
711  throw std::runtime_error(column.col_type.get_type_name() +
712  " is not supported in Arrow result sets.");
713  }
714  }
715  }
716  return seg_row_count;
717  };
718 
719  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
720  const std::vector<bool>& non_lazy_cols,
721  const size_t start_col,
722  const size_t end_col) {
723  for (size_t col = start_col; col < end_col; ++col) {
724  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
725  continue;
726  }
727 
728  const auto& column = builders[col];
729  switch (column.physical_type) {
730  case kTINYINT:
731  convert_column<int8_t>(results_, col, entry_count, result[col]);
732  break;
733  case kSMALLINT:
734  convert_column<int16_t>(results_, col, entry_count, result[col]);
735  break;
736  case kINT:
737  convert_column<int32_t>(results_, col, entry_count, result[col]);
738  break;
739  case kBIGINT:
740  convert_column<int64_t>(results_, col, entry_count, result[col]);
741  break;
742  case kFLOAT:
743  convert_column<float>(results_, col, entry_count, result[col]);
744  break;
745  case kDOUBLE:
746  convert_column<double>(results_, col, entry_count, result[col]);
747  break;
748  default:
749  throw std::runtime_error(column.col_type.get_type_name() +
750  " is not supported in Arrow column converter.");
751  }
752  }
753  };
754 
755  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
756  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
757  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
758  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
759  results_->getQueryMemDesc().getQueryDescriptionType() ==
761  entry_count == results_->entryCount();
762  std::vector<bool> non_lazy_cols;
763  if (use_columnar_converter) {
764  auto timer = DEBUG_TIMER("columnar converter");
765  std::vector<size_t> non_lazy_col_pos;
766  size_t non_lazy_col_count = 0;
767  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
768 
769  non_lazy_cols.reserve(col_count);
770  non_lazy_col_pos.reserve(col_count);
771  for (size_t i = 0; i < col_count; ++i) {
772  bool is_lazy =
773  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
774  // Currently column converter cannot handle some data types.
775  // Treat them as lazy.
776  switch (builders[i].physical_type) {
777  case kBOOLEAN:
778  case kTIME:
779  case kDATE:
780  case kTIMESTAMP:
781  is_lazy = true;
782  break;
783  default:
784  break;
785  }
786  if (builders[i].field->type()->id() == Type::DICTIONARY) {
787  is_lazy = true;
788  }
789  non_lazy_cols.emplace_back(!is_lazy);
790  if (!is_lazy) {
791  ++non_lazy_col_count;
792  non_lazy_col_pos.emplace_back(i);
793  }
794  }
795 
796  if (non_lazy_col_count == col_count) {
797  non_lazy_cols.clear();
798  non_lazy_col_pos.clear();
799  } else {
800  non_lazy_col_pos.emplace_back(col_count);
801  }
802 
803  std::vector<std::future<void>> child_threads;
804  size_t num_threads =
805  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
806 
807  size_t start_col = 0;
808  size_t end_col = 0;
809  for (size_t i = 0; i < num_threads; ++i) {
810  start_col = end_col;
811  end_col = (i + 1) * non_lazy_col_count / num_threads;
812  size_t phys_start_col =
813  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
814  size_t phys_end_col =
815  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
816  child_threads.push_back(std::async(std::launch::async,
817  convert_columns,
818  std::ref(result_columns),
819  non_lazy_cols,
820  phys_start_col,
821  phys_end_col));
822  }
823  for (auto& child : child_threads) {
824  child.get();
825  }
826  row_count = entry_count;
827  }
828  if (!use_columnar_converter || !non_lazy_cols.empty()) {
829  auto timer = DEBUG_TIMER("row converter");
830  row_count = 0;
831  if (multithreaded) {
832  const size_t cpu_count = cpu_threads();
833  std::vector<std::future<size_t>> child_threads;
834  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
835  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
836  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
837  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
838  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
839  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
840  ++i, start_entry += stride) {
841  const auto end_entry = std::min(entry_count, start_entry + stride);
842  child_threads.push_back(std::async(std::launch::async,
843  fetch,
844  std::ref(column_value_segs[i]),
845  std::ref(null_bitmap_segs[i]),
846  non_lazy_cols,
847  start_entry,
848  end_entry));
849  }
850  for (auto& child : child_threads) {
851  row_count += child.get();
852  }
853  {
854  auto timer = DEBUG_TIMER("append rows to arrow");
855  for (int i = 0; i < schema->num_fields(); ++i) {
856  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
857  continue;
858  }
859 
860  for (size_t j = 0; j < cpu_count; ++j) {
861  if (!column_value_segs[j][i]) {
862  continue;
863  }
864  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
865  }
866  }
867  }
868  } else {
869  row_count =
870  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
871  {
872  auto timer = DEBUG_TIMER("append rows to arrow single thread");
873  for (int i = 0; i < schema->num_fields(); ++i) {
874  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
875  continue;
876  }
877 
878  append(builders[i], *column_values[i], null_bitmaps[i]);
879  }
880  }
881  }
882 
883  {
884  auto timer = DEBUG_TIMER("finish builders");
885  for (size_t i = 0; i < col_count; ++i) {
886  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
887  continue;
888  }
889 
890  result_columns[i] = finishColumnBuilder(builders[i]);
891  }
892  }
893  }
894 
895  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
896 }
897 
898 namespace {
899 
900 std::shared_ptr<arrow::DataType> get_arrow_type(const SQLTypeInfo& sql_type,
901  const ExecutorDeviceType device_type) {
902  switch (get_physical_type(sql_type)) {
903  case kBOOLEAN:
904  return arrow::boolean();
905  case kTINYINT:
906  return arrow::int8();
907  case kSMALLINT:
908  return arrow::int16();
909  case kINT:
910  return arrow::int32();
911  case kBIGINT:
912  return arrow::int64();
913  case kFLOAT:
914  return arrow::float32();
915  case kDOUBLE:
916  return arrow::float64();
917  case kCHAR:
918  case kVARCHAR:
919  case kTEXT:
920  if (sql_type.is_dict_encoded_string()) {
921  auto value_type = std::make_shared<StringType>();
922  return dictionary(int32(), value_type, false);
923  }
924  return utf8();
925  case kDECIMAL:
926  case kNUMERIC:
927  return decimal(sql_type.get_precision(), sql_type.get_scale());
928  case kTIME:
929  return time32(TimeUnit::SECOND);
930  case kDATE: {
931  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
932  // Currently support for date32() is missing in cuDF.Hence, if client requests for
933  // date on GPU, return date64() for the time being, till support is added.
934  if (device_type == ExecutorDeviceType::GPU) {
935  return date64();
936  } else {
937  return date32();
938  }
939  }
940  case kTIMESTAMP:
941  switch (sql_type.get_precision()) {
942  case 0:
943  return timestamp(TimeUnit::SECOND);
944  case 3:
945  return timestamp(TimeUnit::MILLI);
946  case 6:
947  return timestamp(TimeUnit::MICRO);
948  case 9:
949  return timestamp(TimeUnit::NANO);
950  default:
951  throw std::runtime_error(
952  "Unsupported timestamp precision for Arrow result sets: " +
953  std::to_string(sql_type.get_precision()));
954  }
955  case kARRAY:
956  case kINTERVAL_DAY_TIME:
958  default:
959  throw std::runtime_error(sql_type.get_type_name() +
960  " is not supported in Arrow result sets.");
961  }
962  return nullptr;
963 }
964 
965 } // namespace
966 
967 std::shared_ptr<arrow::Field> ArrowResultSetConverter::makeField(
968  const std::string name,
969  const SQLTypeInfo& target_type) const {
970  return arrow::field(
971  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
972 }
973 
975  const ArrowResult& result,
976  const ExecutorDeviceType device_type,
977  const size_t device_id,
978  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
979 #ifndef _MSC_VER
980  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
981  // Remove shared memory on sysmem
982  if (!result.sm_handle.empty()) {
983  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
984  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
985  auto shm_id = shmget(schema_key, result.sm_size, 0666);
986  if (shm_id < 0) {
987  throw std::runtime_error(
988  "failed to get an valid shm ID w/ given shm key of the schema");
989  }
990  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
991  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
992  std::to_string(errno) + ")");
993  }
994  }
995 
996  if (device_type == ExecutorDeviceType::CPU) {
997  CHECK_EQ(sizeof(key_t), result.df_handle.size());
998  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
999  auto shm_id = shmget(df_key, result.df_size, 0666);
1000  if (shm_id < 0) {
1001  throw std::runtime_error(
1002  "failed to get an valid shm ID w/ given shm key of the data");
1003  }
1004  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1005  throw std::runtime_error("failed to deallocate Arrow data frame");
1006  }
1007  }
1008  // CUDA buffers become owned by the caller, and will automatically be freed
1009  // TODO: What if the client never takes ownership of the result? we may want to
1010  // establish a check to see if the GPU buffer still exists, and then free it.
1011 #endif
1012 }
1013 
1015  ColumnBuilder& column_builder,
1016  const SQLTypeInfo& col_type,
1017  const std::shared_ptr<arrow::Field>& field) const {
1018  column_builder.field = field;
1019  column_builder.col_type = col_type;
1020  column_builder.physical_type = col_type.is_dict_encoded_string()
1021  ? get_dict_index_type(col_type)
1022  : get_physical_type(col_type);
1023 
1024  auto value_type = field->type();
1025  if (col_type.is_dict_encoded_string()) {
1026  column_builder.builder.reset(new StringDictionary32Builder());
1027  // add values to the builder
1028  const int dict_id = col_type.get_comp_param();
1029  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1030 
1031  arrow::StringBuilder str_array_builder;
1032  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1033  std::shared_ptr<StringArray> string_array;
1034  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1035 
1036  auto dict_builder =
1037  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1038  CHECK(dict_builder);
1039 
1040  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1041  } else {
1043  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
1044  }
1045 }
1046 
1047 std::shared_ptr<arrow::Array> ArrowResultSetConverter::finishColumnBuilder(
1048  ColumnBuilder& column_builder) const {
1049  std::shared_ptr<Array> values;
1050  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1051  return values;
1052 }
1053 
1054 namespace {
1055 
1056 template <typename BUILDER_TYPE, typename VALUE_ARRAY_TYPE>
1058  const ValueArray& values,
1059  const std::shared_ptr<std::vector<bool>>& is_valid) {
1060  static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1061  "Dictionary encoded string builder requires function specialization.");
1062 
1063  std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1064 
1065  if (scale_epoch_values<BUILDER_TYPE>()) {
1066  auto scale_sec_to_millisec = [](auto seconds) { return seconds * kMilliSecsPerSec; };
1067  auto scale_values = [&](auto epoch) {
1068  return std::is_same<BUILDER_TYPE, Date32Builder>::value
1070  : scale_sec_to_millisec(epoch);
1071  };
1072  std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1073  }
1074 
1075  auto typed_builder = dynamic_cast<BUILDER_TYPE*>(column_builder.builder.get());
1076  CHECK(typed_builder);
1077  if (column_builder.field->nullable()) {
1078  CHECK(is_valid.get());
1079  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals, *is_valid));
1080  } else {
1081  ARROW_THROW_NOT_OK(typed_builder->AppendValues(vals));
1082  }
1083 }
1084 
1085 template <>
1086 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1088  const ValueArray& values,
1089  const std::shared_ptr<std::vector<bool>>& is_valid) {
1090  auto typed_builder =
1091  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1092  CHECK(typed_builder);
1093 
1094  std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1095 
1096  if (column_builder.field->nullable()) {
1097  CHECK(is_valid.get());
1098  // TODO(adb): Generate this instead of the boolean bitmap
1099  std::vector<uint8_t> transformed_bitmap;
1100  transformed_bitmap.reserve(is_valid->size());
1101  std::for_each(
1102  is_valid->begin(), is_valid->end(), [&transformed_bitmap](const bool is_valid) {
1103  transformed_bitmap.push_back(is_valid ? 1 : 0);
1104  });
1105 
1106  ARROW_THROW_NOT_OK(typed_builder->AppendIndices(
1107  vals.data(), static_cast<int64_t>(vals.size()), transformed_bitmap.data()));
1108  } else {
1110  typed_builder->AppendIndices(vals.data(), static_cast<int64_t>(vals.size())));
1111  }
1112 }
1113 
1114 } // namespace
1115 
1117  ColumnBuilder& column_builder,
1118  const ValueArray& values,
1119  const std::shared_ptr<std::vector<bool>>& is_valid) const {
1120  if (column_builder.col_type.is_dict_encoded_string()) {
1121  CHECK_EQ(column_builder.physical_type,
1122  kINT); // assume all dicts use none-encoded type for now
1123  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
1124  column_builder, values, is_valid);
1125  return;
1126  }
1127  switch (column_builder.physical_type) {
1128  case kBOOLEAN:
1129  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
1130  break;
1131  case kTINYINT:
1132  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
1133  break;
1134  case kSMALLINT:
1135  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
1136  break;
1137  case kINT:
1138  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
1139  break;
1140  case kBIGINT:
1141  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
1142  break;
1143  case kFLOAT:
1144  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
1145  break;
1146  case kDOUBLE:
1147  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
1148  break;
1149  case kTIME:
1150  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
1151  break;
1152  case kTIMESTAMP:
1153  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
1154  break;
1155  case kDATE:
1156  device_type_ == ExecutorDeviceType::GPU
1157  ? appendToColumnBuilder<Date64Builder, int64_t>(
1158  column_builder, values, is_valid)
1159  : appendToColumnBuilder<Date32Builder, int32_t>(
1160  column_builder, values, is_valid);
1161  break;
1162  case kCHAR:
1163  case kVARCHAR:
1164  case kTEXT:
1165  default:
1166  // TODO(miyu): support more scalar types.
1167  throw std::runtime_error(column_builder.col_type.get_type_name() +
1168  " is not supported in Arrow result sets.");
1169  }
1170 }
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
Definition: sqltypes.h:48
SQLTypes
Definition: sqltypes.h:37
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
bool is_fp() const
Definition: sqltypes.h:492
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
string name
Definition: setup.in.py:62
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< arrow::Field > field
#define DICTIONARY
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
std::shared_ptr< ResultSet > ResultSetPtr
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
bool is_time() const
Definition: sqltypes.h:494
std::string to_string(char const *&&v)
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< Array > &out)
std::pair< key_t, void * > get_shm(size_t shmsz)
static constexpr int64_t kMilliSecsPerSec
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
#define ARROW_RECORDBATCH_MAKE
bool is_integer() const
Definition: sqltypes.h:490
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:159
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
bool is_boolean() const
Definition: sqltypes.h:495
int get_precision() const
Definition: sqltypes.h:317
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
Definition: sqltypes.h:51
Definition: sqltypes.h:52
int64_t sm_size
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
int64_t df_size
std::string get_type_name() const
Definition: sqltypes.h:417
#define IS_INTEGER(T)
Definition: sqltypes.h:239
Definition: sqltypes.h:40
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
char * f
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
bool is_dict_encoded_string() const
Definition: sqltypes.h:525
Definition: sqltypes.h:44
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321
int cpu_threads()
Definition: thread_count.h:24
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
#define VLOG(n)
Definition: Logger.h:291
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:156
std::shared_ptr< arrow::RecordBatch > convertToArrow() const