OmniSciDB  343343d194
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "ResultSet.h"
26 
29 #include "Execute.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
33 #include "RuntimeFunctions.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 
39 #include <algorithm>
40 #include <bitset>
41 #include <future>
42 #include <numeric>
43 
44 ResultSetStorage::ResultSetStorage(const std::vector<TargetInfo>& targets,
45  const QueryMemoryDescriptor& query_mem_desc,
46  int8_t* buff,
47  const bool buff_is_provided)
48  : targets_(targets)
49  , query_mem_desc_(query_mem_desc)
50  , buff_(buff)
51  , buff_is_provided_(buff_is_provided) {
52  for (const auto& target_info : targets_) {
53  if (target_info.agg_kind == kCOUNT ||
54  target_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
55  target_init_vals_.push_back(0);
56  continue;
57  }
58  if (!target_info.sql_type.get_notnull()) {
59  int64_t init_val =
60  null_val_bit_pattern(target_info.sql_type, takes_float_argument(target_info));
61  target_init_vals_.push_back(target_info.is_agg ? init_val : 0);
62  } else {
63  target_init_vals_.push_back(target_info.is_agg ? 0xdeadbeef : 0);
64  }
65  if (target_info.agg_kind == kAVG) {
66  target_init_vals_.push_back(0);
67  } else if (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_geometry()) {
68  for (int i = 1; i < 2 * target_info.sql_type.get_physical_coord_cols(); i++) {
69  target_init_vals_.push_back(0);
70  }
71  } else if (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) {
72  target_init_vals_.push_back(0);
73  }
74  }
75 }
76 
78  return buff_;
79 }
80 
81 void ResultSet::keepFirstN(const size_t n) {
82  CHECK_EQ(-1, cached_row_count_);
83  keep_first_ = n;
84 }
85 
86 void ResultSet::dropFirstN(const size_t n) {
87  CHECK_EQ(-1, cached_row_count_);
88  drop_first_ = n;
89 }
90 
91 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
92  const ExecutorDeviceType device_type,
93  const QueryMemoryDescriptor& query_mem_desc,
94  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
95  const Executor* executor)
96  : targets_(targets)
97  , device_type_(device_type)
98  , device_id_(-1)
99  , query_mem_desc_(query_mem_desc)
100  , crt_row_buff_idx_(0)
101  , fetched_so_far_(0)
102  , drop_first_(0)
103  , keep_first_(0)
104  , row_set_mem_owner_(row_set_mem_owner)
105  , queue_time_ms_(0)
106  , render_time_ms_(0)
107  , executor_(executor)
108  , estimator_buffer_(nullptr)
109  , host_estimator_buffer_(nullptr)
110  , data_mgr_(nullptr)
111  , separate_varlen_storage_valid_(false)
112  , just_explain_(false)
113  , cached_row_count_(-1)
114  , geo_return_type_(GeoReturnType::WktString) {}
115 
116 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
117  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
118  const std::vector<std::vector<const int8_t*>>& col_buffers,
119  const std::vector<std::vector<int64_t>>& frag_offsets,
120  const std::vector<int64_t>& consistent_frag_sizes,
121  const ExecutorDeviceType device_type,
122  const int device_id,
123  const QueryMemoryDescriptor& query_mem_desc,
124  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
125  const Executor* executor)
126  : targets_(targets)
127  , device_type_(device_type)
128  , device_id_(device_id)
129  , query_mem_desc_(query_mem_desc)
130  , crt_row_buff_idx_(0)
131  , fetched_so_far_(0)
132  , drop_first_(0)
133  , keep_first_(0)
134  , row_set_mem_owner_(row_set_mem_owner)
135  , queue_time_ms_(0)
136  , render_time_ms_(0)
137  , executor_(executor)
138  , lazy_fetch_info_(lazy_fetch_info)
139  , col_buffers_{col_buffers}
140  , frag_offsets_{frag_offsets}
141  , consistent_frag_sizes_{consistent_frag_sizes}
142  , estimator_buffer_(nullptr)
143  , host_estimator_buffer_(nullptr)
144  , data_mgr_(nullptr)
145  , separate_varlen_storage_valid_(false)
146  , just_explain_(false)
147  , cached_row_count_(-1)
148  , geo_return_type_(GeoReturnType::WktString) {}
149 
150 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
151  const ExecutorDeviceType device_type,
152  const int device_id,
153  Data_Namespace::DataMgr* data_mgr)
154  : device_type_(device_type)
155  , device_id_(device_id)
156  , query_mem_desc_{}
157  , crt_row_buff_idx_(0)
158  , estimator_(estimator)
159  , estimator_buffer_(nullptr)
160  , host_estimator_buffer_(nullptr)
161  , data_mgr_(data_mgr)
162  , separate_varlen_storage_valid_(false)
163  , just_explain_(false)
164  , cached_row_count_(-1)
165  , geo_return_type_(GeoReturnType::WktString) {
166  if (device_type == ExecutorDeviceType::GPU) {
167  estimator_buffer_ =
168  CudaAllocator::alloc(data_mgr_, estimator_->getBufferSize(), device_id_);
169  data_mgr->getCudaMgr()->zeroDeviceMem(
170  estimator_buffer_, estimator_->getBufferSize(), device_id_);
171  } else {
172  host_estimator_buffer_ =
173  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
174  }
175 }
176 
177 ResultSet::ResultSet(const std::string& explanation)
178  : device_type_(ExecutorDeviceType::CPU)
179  , device_id_(-1)
180  , fetched_so_far_(0)
181  , queue_time_ms_(0)
182  , render_time_ms_(0)
183  , estimator_buffer_(nullptr)
184  , host_estimator_buffer_(nullptr)
185  , separate_varlen_storage_valid_(false)
186  , explanation_(explanation)
187  , just_explain_(true)
188  , cached_row_count_(-1)
189  , geo_return_type_(GeoReturnType::WktString) {}
190 
191 ResultSet::ResultSet(int64_t queue_time_ms,
192  int64_t render_time_ms,
193  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
194  : device_type_(ExecutorDeviceType::CPU)
195  , device_id_(-1)
196  , fetched_so_far_(0)
197  , queue_time_ms_(queue_time_ms)
198  , render_time_ms_(render_time_ms)
199  , estimator_buffer_(nullptr)
200  , host_estimator_buffer_(nullptr)
201  , separate_varlen_storage_valid_(false)
202  , just_explain_(true)
203  , cached_row_count_(-1)
204  , geo_return_type_(GeoReturnType::WktString){};
205 
207  if (storage_) {
208  CHECK(storage_->getUnderlyingBuffer());
209  if (!storage_->buff_is_provided_) {
210  free(storage_->getUnderlyingBuffer());
211  }
212  }
213  for (auto& storage : appended_storage_) {
214  if (storage && !storage->buff_is_provided_) {
215  free(storage->getUnderlyingBuffer());
216  }
217  }
218  if (host_estimator_buffer_) {
219  CHECK(device_type_ == ExecutorDeviceType::CPU || estimator_buffer_);
220  free(host_estimator_buffer_);
221  }
222 }
223 
225  return device_type_;
226 }
227 
229  CHECK(!storage_);
230  auto buff = static_cast<int8_t*>(
231  checked_malloc(query_mem_desc_.getBufferSizeBytes(device_type_)));
232  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, false));
233  return storage_.get();
234 }
235 
237  int8_t* buff,
238  const std::vector<int64_t>& target_init_vals) const {
239  CHECK(buff);
240  CHECK(!storage_);
241  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
242  storage_->target_init_vals_ = target_init_vals;
243  return storage_.get();
244 }
245 
247  const std::vector<int64_t>& target_init_vals) const {
248  CHECK(!storage_);
249  auto buff = static_cast<int8_t*>(
250  checked_malloc(query_mem_desc_.getBufferSizeBytes(device_type_)));
251  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, false));
252  storage_->target_init_vals_ = target_init_vals;
253  return storage_.get();
254 }
255 
257  if (crt_row_buff_idx_ == 0) {
258  throw std::runtime_error("current row buffer iteration index is undefined");
259  }
260  return crt_row_buff_idx_ - 1;
261 }
262 
263 void ResultSet::append(ResultSet& that) {
264  CHECK_EQ(-1, cached_row_count_);
265  if (!that.storage_) {
266  return;
267  }
268  appended_storage_.push_back(std::move(that.storage_));
269  query_mem_desc_.setEntryCount(
270  query_mem_desc_.getEntryCount() +
271  appended_storage_.back()->query_mem_desc_.getEntryCount());
272  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
273  col_buffers_.insert(
274  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
275  frag_offsets_.insert(
276  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
277  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
278  that.consistent_frag_sizes_.begin(),
279  that.consistent_frag_sizes_.end());
280  chunk_iters_.insert(
281  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
282  if (separate_varlen_storage_valid_) {
283  CHECK(that.separate_varlen_storage_valid_);
284  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
285  that.serialized_varlen_buffer_.begin(),
286  that.serialized_varlen_buffer_.end());
287  }
288  for (auto& buff : that.literal_buffers_) {
289  literal_buffers_.push_back(std::move(buff));
290  }
291 }
292 
294  return storage_.get();
295 }
296 
297 size_t ResultSet::colCount() const {
298  return just_explain_ ? 1 : targets_.size();
299 }
300 
301 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
302  if (just_explain_) {
303  return SQLTypeInfo(kTEXT, false);
304  }
305  CHECK_LT(col_idx, targets_.size());
306  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
307  : targets_[col_idx].sql_type;
308 }
309 
310 size_t ResultSet::rowCount(const bool force_parallel) const {
311  if (just_explain_) {
312  return 1;
313  }
314  if (!permutation_.empty()) {
315  return permutation_.size();
316  }
317  if (cached_row_count_ != -1) {
318  CHECK_GE(cached_row_count_, 0);
319  return cached_row_count_;
320  }
321  if (!storage_) {
322  return 0;
323  }
324  if (force_parallel || entryCount() > 20000) {
325  return parallelRowCount();
326  }
327  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
328  moveToBegin();
329  size_t row_count{0};
330  while (true) {
331  auto crt_row = getNextRowUnlocked(false, false);
332  if (crt_row.empty()) {
333  break;
334  }
335  ++row_count;
336  }
337  moveToBegin();
338  return row_count;
339 }
340 
341 void ResultSet::setCachedRowCount(const size_t row_count) const {
342  CHECK(cached_row_count_ == -1 || cached_row_count_ == static_cast<ssize_t>(row_count));
343  cached_row_count_ = row_count;
344 }
345 
347  size_t row_count{0};
348  const size_t worker_count = cpu_threads();
349  std::vector<std::future<size_t>> counter_threads;
350  for (size_t i = 0,
351  start_entry = 0,
352  stride = (entryCount() + worker_count - 1) / worker_count;
353  i < worker_count && start_entry < entryCount();
354  ++i, start_entry += stride) {
355  const auto end_entry = std::min(start_entry + stride, entryCount());
356  counter_threads.push_back(std::async(
357  std::launch::async,
358  [this](const size_t start, const size_t end) {
359  size_t row_count{0};
360  for (size_t i = start; i < end; ++i) {
361  if (!isRowAtEmpty(i)) {
362  ++row_count;
363  }
364  }
365  return row_count;
366  },
367  start_entry,
368  end_entry));
369  }
370  for (auto& child : counter_threads) {
371  child.wait();
372  }
373  for (auto& child : counter_threads) {
374  row_count += child.get();
375  }
376  if (keep_first_ + drop_first_) {
377  const auto limited_row_count = std::min(keep_first_ + drop_first_, row_count);
378  return limited_row_count < drop_first_ ? 0 : limited_row_count - drop_first_;
379  }
380  return row_count;
381 }
382 
384  return !storage_ && !estimator_ && !just_explain_;
385 }
386 
388  CHECK(storage_);
389  return storage_->query_mem_desc_;
390 }
391 
392 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
393  return targets_;
394 }
395 
396 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
397  CHECK(storage_);
398  return storage_->target_init_vals_;
399 }
400 
402  CHECK(device_type_ == ExecutorDeviceType::GPU);
403  return estimator_buffer_;
404 }
405 
407  return host_estimator_buffer_;
408 }
409 
411  CHECK(device_type_ == ExecutorDeviceType::GPU);
412  CHECK(!host_estimator_buffer_);
413  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
414  host_estimator_buffer_ =
415  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
416  copy_from_gpu(data_mgr_,
417  host_estimator_buffer_,
418  reinterpret_cast<CUdeviceptr>(estimator_buffer_),
419  estimator_->getBufferSize(),
420  device_id_);
421 }
422 
423 void ResultSet::setQueueTime(const int64_t queue_time) {
424  queue_time_ms_ = queue_time;
425 }
426 
427 int64_t ResultSet::getQueueTime() const {
428  return queue_time_ms_;
429 }
430 
431 int64_t ResultSet::getRenderTime() const {
432  return render_time_ms_;
433 }
434 
436  crt_row_buff_idx_ = 0;
437  fetched_so_far_ = 0;
438 }
439 
441  return keep_first_ + drop_first_;
442 }
443 
444 bool ResultSet::isExplain() const {
445  return just_explain_;
446 }
447 
449  return device_id_;
450 }
451 
453  const QueryMemoryDescriptor& query_mem_desc) {
454  auto query_mem_desc_copy = query_mem_desc;
455  query_mem_desc_copy.resetGroupColWidths(
456  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
457  if (query_mem_desc.didOutputColumnar()) {
458  return query_mem_desc_copy;
459  }
460  query_mem_desc_copy.alignPaddedSlots();
461  return query_mem_desc_copy;
462 }
463 
464 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
465  const size_t top_n) {
466  CHECK_EQ(-1, cached_row_count_);
467  CHECK(!targets_.empty());
468 #ifdef HAVE_CUDA
469  if (canUseFastBaselineSort(order_entries, top_n)) {
470  baselineSort(order_entries, top_n);
471  return;
472  }
473 #endif // HAVE_CUDA
474  if (query_mem_desc_.sortOnGpu()) {
475  try {
476  radixSortOnGpu(order_entries);
477  } catch (const OutOfMemory&) {
478  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
479  radixSortOnCpu(order_entries);
480  } catch (const std::bad_alloc&) {
481  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
482  radixSortOnCpu(order_entries);
483  }
484  return;
485  }
486  // This check isn't strictly required, but allows the index buffer to be 32-bit.
487  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
488  throw RowSortException("Sorting more than 4B elements not supported");
489  }
490 
491  CHECK(permutation_.empty());
492 
493  const bool use_heap{order_entries.size() == 1 && top_n};
494  if (use_heap && entryCount() > 100000) {
495  if (g_enable_watchdog && (entryCount() > 20000000)) {
496  throw WatchdogException("Sorting the result would be too slow");
497  }
498  parallelTop(order_entries, top_n);
499  return;
500  }
501 
502  if (g_enable_watchdog && (entryCount() > Executor::baseline_threshold)) {
503  throw WatchdogException("Sorting the result would be too slow");
504  }
505 
506  permutation_ = initPermutationBuffer(0, 1);
507 
508  auto compare = createComparator(order_entries, use_heap);
509 
510  if (use_heap) {
511  topPermutation(permutation_, top_n, compare);
512  } else {
513  sortPermutation(compare);
514  }
515 }
516 
517 #ifdef HAVE_CUDA
518 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
519  const size_t top_n) {
520  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
521  if (getGpuCount() > 1) {
522  try {
523  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n);
524  } catch (...) {
525  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n);
526  }
527  } else {
528  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n);
529  }
530 }
531 #endif // HAVE_CUDA
532 
533 std::vector<uint32_t> ResultSet::initPermutationBuffer(const size_t start,
534  const size_t step) {
535  CHECK_NE(size_t(0), step);
536  std::vector<uint32_t> permutation;
537  const auto total_entries = query_mem_desc_.getEntryCount();
538  permutation.reserve(total_entries / step);
539  for (size_t i = start; i < total_entries; i += step) {
540  const auto storage_lookup_result = findStorage(i);
541  const auto lhs_storage = storage_lookup_result.storage_ptr;
542  const auto off = storage_lookup_result.fixedup_entry_idx;
543  CHECK(lhs_storage);
544  if (!lhs_storage->isEmptyEntry(off)) {
545  permutation.push_back(i);
546  }
547  }
548  return permutation;
549 }
550 
551 const std::vector<uint32_t>& ResultSet::getPermutationBuffer() const {
552  return permutation_;
553 }
554 
555 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
556  const size_t top_n) {
557  const size_t step = cpu_threads();
558  std::vector<std::vector<uint32_t>> strided_permutations(step);
559  std::vector<std::future<void>> init_futures;
560  for (size_t start = 0; start < step; ++start) {
561  init_futures.emplace_back(
562  std::async(std::launch::async, [this, start, step, &strided_permutations] {
563  strided_permutations[start] = initPermutationBuffer(start, step);
564  }));
565  }
566  for (auto& init_future : init_futures) {
567  init_future.wait();
568  }
569  for (auto& init_future : init_futures) {
570  init_future.get();
571  }
572  auto compare = createComparator(order_entries, true);
573  std::vector<std::future<void>> top_futures;
574  for (auto& strided_permutation : strided_permutations) {
575  top_futures.emplace_back(
576  std::async(std::launch::async, [&strided_permutation, &compare, top_n] {
577  topPermutation(strided_permutation, top_n, compare);
578  }));
579  }
580  for (auto& top_future : top_futures) {
581  top_future.wait();
582  }
583  for (auto& top_future : top_futures) {
584  top_future.get();
585  }
586  permutation_.reserve(strided_permutations.size() * top_n);
587  for (const auto& strided_permutation : strided_permutations) {
588  permutation_.insert(
589  permutation_.end(), strided_permutation.begin(), strided_permutation.end());
590  }
591  topPermutation(permutation_, top_n, compare);
592 }
593 
594 std::pair<ssize_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
595  size_t fixedup_entry_idx = entry_idx;
596  auto entry_count = storage_->query_mem_desc_.getEntryCount();
597  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
598  if (fixedup_entry_idx < entry_count) {
599  return {0, fixedup_entry_idx};
600  }
601  fixedup_entry_idx -= entry_count;
602  for (size_t i = 0; i < appended_storage_.size(); ++i) {
603  const auto& desc = appended_storage_[i]->query_mem_desc_;
604  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
605  entry_count = desc.getEntryCount();
606  if (fixedup_entry_idx < entry_count) {
607  return {i + 1, fixedup_entry_idx};
608  }
609  fixedup_entry_idx -= entry_count;
610  }
611  CHECK(false);
612  return {-1, entry_idx};
613 }
614 
616  ssize_t stg_idx{-1};
617  size_t fixedup_entry_idx{entry_idx};
618  std::tie(stg_idx, fixedup_entry_idx) = getStorageIndex(entry_idx);
619  CHECK_LE(ssize_t(0), stg_idx);
620  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
621  fixedup_entry_idx,
622  static_cast<size_t>(stg_idx)};
623 }
624 
625 template <typename BUFFER_ITERATOR_TYPE>
627  const uint32_t lhs,
628  const uint32_t rhs) const {
629  // NB: The compare function must define a strict weak ordering, otherwise
630  // std::sort will trigger a segmentation fault (or corrupt memory).
631  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
632  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
633  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
634  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
635  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
636  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
637  for (const auto order_entry : order_entries_) {
638  CHECK_GE(order_entry.tle_no, 1);
639  const auto& agg_info = result_set_->targets_[order_entry.tle_no - 1];
640  const auto entry_ti = get_compact_type(agg_info);
641  bool float_argument_input = takes_float_argument(agg_info);
642  // Need to determine if the float value has been stored as float
643  // or if it has been compacted to a different (often larger 8 bytes)
644  // in distributed case the floats are actually 4 bytes
645  // TODO the above takes_float_argument() is widely used wonder if this problem
646  // exists elsewhere
647  if (entry_ti.get_type() == kFLOAT) {
648  const auto is_col_lazy =
649  !result_set_->lazy_fetch_info_.empty() &&
650  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
651  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
652  sizeof(float)) {
653  float_argument_input =
654  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
655  }
656  }
657  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
658  fixedup_lhs,
659  order_entry.tle_no - 1,
660  lhs_storage_lookup_result);
661  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
662  fixedup_rhs,
663  order_entry.tle_no - 1,
664  rhs_storage_lookup_result);
665  if (UNLIKELY(isNull(entry_ti, lhs_v, float_argument_input) &&
666  isNull(entry_ti, rhs_v, float_argument_input))) {
667  return false;
668  }
669  if (UNLIKELY(isNull(entry_ti, lhs_v, float_argument_input) &&
670  !isNull(entry_ti, rhs_v, float_argument_input))) {
671  return use_heap_ ? !order_entry.nulls_first : order_entry.nulls_first;
672  }
673  if (UNLIKELY(isNull(entry_ti, rhs_v, float_argument_input) &&
674  !isNull(entry_ti, lhs_v, float_argument_input))) {
675  return use_heap_ ? order_entry.nulls_first : !order_entry.nulls_first;
676  }
677  const bool use_desc_cmp = use_heap_ ? !order_entry.is_desc : order_entry.is_desc;
678  if (LIKELY(lhs_v.isInt())) {
679  CHECK(rhs_v.isInt());
680  if (UNLIKELY(entry_ti.is_string() &&
681  entry_ti.get_compression() == kENCODING_DICT)) {
682  CHECK_EQ(4, entry_ti.get_logical_size());
683  const auto string_dict_proxy = result_set_->executor_->getStringDictionaryProxy(
684  entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
685  auto lhs_str = string_dict_proxy->getString(lhs_v.i1);
686  auto rhs_str = string_dict_proxy->getString(rhs_v.i1);
687  if (lhs_str == rhs_str) {
688  continue;
689  }
690  return use_desc_cmp ? lhs_str > rhs_str : lhs_str < rhs_str;
691  }
692  if (UNLIKELY(is_distinct_target(result_set_->targets_[order_entry.tle_no - 1]))) {
693  const auto lhs_sz = count_distinct_set_size(
694  lhs_v.i1,
695  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no -
696  1));
697  const auto rhs_sz = count_distinct_set_size(
698  rhs_v.i1,
699  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no -
700  1));
701  if (lhs_sz == rhs_sz) {
702  continue;
703  }
704  return use_desc_cmp ? lhs_sz > rhs_sz : lhs_sz < rhs_sz;
705  }
706  if (lhs_v.i1 == rhs_v.i1) {
707  continue;
708  }
709  if (entry_ti.is_fp()) {
710  if (float_argument_input) {
711  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
712  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
713  return use_desc_cmp ? lhs_dval > rhs_dval : lhs_dval < rhs_dval;
714  } else {
715  const auto lhs_dval =
716  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
717  const auto rhs_dval =
718  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
719  return use_desc_cmp ? lhs_dval > rhs_dval : lhs_dval < rhs_dval;
720  }
721  }
722  return use_desc_cmp ? lhs_v.i1 > rhs_v.i1 : lhs_v.i1 < rhs_v.i1;
723  } else {
724  if (lhs_v.isPair()) {
725  CHECK(rhs_v.isPair());
726  const auto lhs =
727  pair_to_double({lhs_v.i1, lhs_v.i2}, entry_ti, float_argument_input);
728  const auto rhs =
729  pair_to_double({rhs_v.i1, rhs_v.i2}, entry_ti, float_argument_input);
730  if (lhs == rhs) {
731  continue;
732  }
733  return use_desc_cmp ? lhs > rhs : lhs < rhs;
734  } else {
735  CHECK(lhs_v.isStr() && rhs_v.isStr());
736  const auto lhs = lhs_v.strVal();
737  const auto rhs = rhs_v.strVal();
738  if (lhs == rhs) {
739  continue;
740  }
741  return use_desc_cmp ? lhs > rhs : lhs < rhs;
742  }
743  }
744  }
745  return false;
746 }
747 
749  std::vector<uint32_t>& to_sort,
750  const size_t n,
751  const std::function<bool(const uint32_t, const uint32_t)> compare) {
752  std::make_heap(to_sort.begin(), to_sort.end(), compare);
753  std::vector<uint32_t> permutation_top;
754  permutation_top.reserve(n);
755  for (size_t i = 0; i < n && !to_sort.empty(); ++i) {
756  permutation_top.push_back(to_sort.front());
757  std::pop_heap(to_sort.begin(), to_sort.end(), compare);
758  to_sort.pop_back();
759  }
760  to_sort.swap(permutation_top);
761 }
762 
764  const std::function<bool(const uint32_t, const uint32_t)> compare) {
765  std::sort(permutation_.begin(), permutation_.end(), compare);
766 }
767 
769  const std::list<Analyzer::OrderEntry>& order_entries) const {
770  auto data_mgr = &executor_->catalog_->getDataMgr();
771  const int device_id{0};
772  CudaAllocator cuda_allocator(data_mgr, device_id);
773  std::vector<int64_t*> group_by_buffers(executor_->blockSize());
774  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
775  auto dev_group_by_buffers =
776  create_dev_group_by_buffers(&cuda_allocator,
777  group_by_buffers,
778  query_mem_desc_,
779  executor_->blockSize(),
780  executor_->gridSize(),
781  device_id,
783  -1,
784  true,
785  true,
786  false,
787  nullptr);
789  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
791  data_mgr,
792  group_by_buffers,
793  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
794  dev_group_by_buffers.second,
795  query_mem_desc_,
796  executor_->blockSize(),
797  executor_->gridSize(),
798  device_id,
799  false);
800 }
801 
803  const std::list<Analyzer::OrderEntry>& order_entries) const {
804  CHECK(!query_mem_desc_.hasKeylessHash());
805  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
806  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
807  CHECK_EQ(size_t(1), order_entries.size());
808  auto buffer_ptr = storage_->getUnderlyingBuffer();
809  for (const auto& order_entry : order_entries) {
810  const auto target_idx = order_entry.tle_no - 1;
811  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
812  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
813  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
814  sort_groups_cpu(sortkey_val_buff,
815  &idx_buff[0],
816  query_mem_desc_.getEntryCount(),
817  order_entry.is_desc,
818  chosen_bytes);
819  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
820  &idx_buff[0],
821  query_mem_desc_.getEntryCount(),
822  &tmp_buff[0],
823  sizeof(int64_t));
824  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
825  ++target_idx) {
826  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
827  continue;
828  }
829  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
830  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
831  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
832  apply_permutation_cpu(satellite_val_buff,
833  &idx_buff[0],
834  query_mem_desc_.getEntryCount(),
835  &tmp_buff[0],
836  chosen_bytes);
837  }
838  }
839 }
840 
842  const int64_t ptr) {
843  const auto it_ok = count_distinct_sets_mapping_.emplace(remote_ptr, ptr);
844  CHECK(it_ok.second);
845 }
846 
847 int64_t ResultSetStorage::mappedPtr(const int64_t remote_ptr) const {
848  const auto it = count_distinct_sets_mapping_.find(remote_ptr);
849  // Due to the removal of completely zero bitmaps in a distributed transfer there will be
850  // remote ptr that do not not exists. Return 0 if no pointer found
851  if (it == count_distinct_sets_mapping_.end()) {
852  return int64_t(0);
853  }
854  return it->second;
855 }
856 
858  return keep_first_;
859 }
860 
861 std::shared_ptr<const std::vector<std::string>> ResultSet::getStringDictionaryPayloadCopy(
862  const int dict_id) const {
863  CHECK(executor_);
864  const auto sdp =
865  executor_->getStringDictionaryProxy(dict_id, row_set_mem_owner_, false);
866  return sdp->getDictionary()->copyStrings();
867 }
868 
869 bool can_use_parallel_algorithms(const ResultSet& rows) {
870  return !rows.isTruncated();
871 }
872 
873 bool use_parallel_algorithms(const ResultSet& rows) {
874  return can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
875 }
876 
886  return false;
887  } else if (query_mem_desc_.didOutputColumnar()) {
888  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
890  (query_mem_desc_.getQueryDescriptionType() ==
892  query_mem_desc_.getQueryDescriptionType() ==
894  } else {
895  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
897  query_mem_desc_.getQueryDescriptionType() ==
899  }
900 }
901 
902 // returns a bitmap (and total number) of all single slot targets
903 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
904  std::vector<bool> target_bitmap(targets_.size(), true);
905  size_t num_single_slot_targets = 0;
906  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
907  const auto& sql_type = targets_[target_idx].sql_type;
908  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
909  target_bitmap[target_idx] = false;
910  } else if (sql_type.is_varlen()) {
911  target_bitmap[target_idx] = false;
912  } else {
913  num_single_slot_targets++;
914  }
915  }
916  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
917 }
918 
927 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
928  const {
929  CHECK(isDirectColumnarConversionPossible());
930  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
931 
932  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
933  const auto& target = targets_[target_idx];
934  if (single_slot_targets[target_idx] &&
935  (is_distinct_target(target) ||
936  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
937  single_slot_targets[target_idx] = false;
938  num_single_slot_targets--;
939  }
940  }
941  CHECK_GE(num_single_slot_targets, size_t(0));
942  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
943 }
944 
945 // returns the starting slot index for all targets in the result set
946 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
947  std::vector<size_t> slot_indices(targets_.size(), 0);
948  size_t slot_index = 0;
949  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
950  slot_indices[target_idx] = slot_index;
951  slot_index = advance_slot(slot_index, targets_[target_idx], false);
952  }
953  return slot_indices;
954 }
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:410
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:387
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:203
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:207
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *cuda_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:61
bool g_enable_direct_columnarization
Definition: Execute.cpp:98
ExecutorDeviceType
void moveToBegin() const
Definition: ResultSet.cpp:435
#define LOG(tag)
Definition: Logger.h:182
std::pair< ssize_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:594
static const size_t baseline_threshold
Definition: Execute.h:993
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:392
void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr)
Definition: ResultSet.cpp:841
#define CHECK_GE(x, y)
Definition: Logger.h:200
ResultSetStorage(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc, int8_t *buff, const bool buff_is_provided)
Definition: ResultSet.cpp:44
size_t rowCount(const bool force_parallel=false) const
Definition: ResultSet.cpp:310
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:81
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:120
size_t colCount() const
Definition: ResultSet.cpp:297
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool isTruncated() const
Definition: ResultSet.cpp:440
size_t getLimit()
Definition: ResultSet.cpp:857
std::unordered_map< int64_t, int64_t > count_distinct_sets_mapping_
Definition: ResultSet.h:213
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:77
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Executor *executor)
Definition: ResultSet.cpp:91
size_t parallelRowCount() const
Definition: ResultSet.cpp:346
std::vector< uint32_t > initPermutationBuffer(const size_t start, const size_t step)
Definition: ResultSet.cpp:533
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:802
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:383
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:406
static int8_t * alloc(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
const ResultSetStorage * allocateStorage() const
std::shared_ptr< const std::vector< std::string > > getStringDictionaryPayloadCopy(const int dict_id) const
Definition: ResultSet.cpp:861
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:423
#define CHECK_NE(x, y)
Definition: Logger.h:196
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:86
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:903
const int32_t groups_buffer_size return nullptr
#define LIKELY(x)
Definition: likely.h:19
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:615
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:116
static void topPermutation(std::vector< uint32_t > &to_sort, const size_t n, const std::function< bool(const uint32_t, const uint32_t)> compare)
Definition: ResultSet.cpp:748
bool g_enable_smem_group_by true
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:845
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:768
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:293
int64_t getQueueTime() const
Definition: ResultSet.cpp:427
#define UNLIKELY(x)
Definition: likely.h:20
#define CHECK_LT(x, y)
Definition: Logger.h:197
Definition: sqltypes.h:55
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:301
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:927
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:224
#define CHECK_LE(x, y)
Definition: Logger.h:198
bool isExplain() const
Definition: ResultSet.cpp:444
int8_t * buff_
Definition: ResultSet.h:205
void sortPermutation(const std::function< bool(const uint32_t, const uint32_t)> compare)
Definition: ResultSet.cpp:763
void copy_group_by_buffers_from_gpu(Data_Namespace::DataMgr *data_mgr, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const CUdeviceptr group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer)
int64_t mappedPtr(const int64_t) const
Definition: ResultSet.cpp:847
Definition: sqldefs.h:71
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:452
void append(ResultSet &that)
Definition: ResultSet.cpp:263
bool operator()(const uint32_t lhs, const uint32_t rhs) const
Definition: ResultSet.cpp:626
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:869
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:256
bool g_enable_debug_timer false
Definition: Execute.cpp:68
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:555
#define CHECK(condition)
Definition: Logger.h:187
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:401
bool g_enable_watchdog
Definition: Execute.cpp:69
void resetGroupColWidths(const std::vector< int8_t > &new_group_col_widths)
Basic constructors and methods of the row set interface.
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:396
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:946
const std::vector< uint32_t > & getPermutationBuffer() const
Definition: ResultSet.cpp:551
Allocate GPU memory using GpuBuffers via DataMgr.
void sort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:464
int cpu_threads()
Definition: thread_count.h:25
Definition: sqldefs.h:71
int64_t getRenderTime() const
Definition: ResultSet.cpp:431
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:341
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:884
int getDeviceId() const
Definition: ResultSet.cpp:448