OmniSciDB  eee9fa949c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "ResultSet.h"
26 
29 #include "Execute.h"
30 #include "GpuMemUtils.h"
31 #include "InPlaceSort.h"
33 #include "RuntimeFunctions.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 
39 #include <algorithm>
40 #include <bitset>
41 #include <future>
42 #include <numeric>
43 
44 ResultSetStorage::ResultSetStorage(const std::vector<TargetInfo>& targets,
46  int8_t* buff,
47  const bool buff_is_provided)
48  : targets_(targets)
49  , query_mem_desc_(query_mem_desc)
50  , buff_(buff)
51  , buff_is_provided_(buff_is_provided) {
52  for (const auto& target_info : targets_) {
53  if (target_info.agg_kind == kCOUNT ||
54  target_info.agg_kind == kAPPROX_COUNT_DISTINCT) {
55  target_init_vals_.push_back(0);
56  continue;
57  }
58  if (!target_info.sql_type.get_notnull()) {
59  int64_t init_val =
60  null_val_bit_pattern(target_info.sql_type, takes_float_argument(target_info));
61  target_init_vals_.push_back(target_info.is_agg ? init_val : 0);
62  } else {
63  target_init_vals_.push_back(target_info.is_agg ? 0xdeadbeef : 0);
64  }
65  if (target_info.agg_kind == kAVG) {
66  target_init_vals_.push_back(0);
67  } else if (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_geometry()) {
68  for (int i = 1; i < 2 * target_info.sql_type.get_physical_coord_cols(); i++) {
69  target_init_vals_.push_back(0);
70  }
71  } else if (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) {
72  target_init_vals_.push_back(0);
73  }
74  }
75 }
76 
78  return buff_;
79 }
80 
81 void ResultSet::keepFirstN(const size_t n) {
82  CHECK_EQ(-1, cached_row_count_);
83  keep_first_ = n;
84 }
85 
86 void ResultSet::dropFirstN(const size_t n) {
87  CHECK_EQ(-1, cached_row_count_);
88  drop_first_ = n;
89 }
90 
91 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
92  const ExecutorDeviceType device_type,
94  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
95  const Executor* executor)
96  : targets_(targets)
97  , device_type_(device_type)
98  , device_id_(-1)
99  , query_mem_desc_(query_mem_desc)
100  , crt_row_buff_idx_(0)
101  , fetched_so_far_(0)
102  , drop_first_(0)
103  , keep_first_(0)
104  , row_set_mem_owner_(row_set_mem_owner)
105  , queue_time_ms_(0)
106  , render_time_ms_(0)
107  , executor_(executor)
108  , estimator_buffer_(nullptr)
109  , host_estimator_buffer_(nullptr)
110  , data_mgr_(nullptr)
111  , separate_varlen_storage_valid_(false)
112  , just_explain_(false)
113  , cached_row_count_(-1)
114  , geo_return_type_(GeoReturnType::WktString) {}
115 
116 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
117  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
118  const std::vector<std::vector<const int8_t*>>& col_buffers,
119  const std::vector<std::vector<int64_t>>& frag_offsets,
120  const std::vector<int64_t>& consistent_frag_sizes,
121  const ExecutorDeviceType device_type,
122  const int device_id,
124  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
125  const Executor* executor)
126  : targets_(targets)
127  , device_type_(device_type)
128  , device_id_(device_id)
129  , query_mem_desc_(query_mem_desc)
130  , crt_row_buff_idx_(0)
131  , fetched_so_far_(0)
132  , drop_first_(0)
133  , keep_first_(0)
134  , row_set_mem_owner_(row_set_mem_owner)
135  , queue_time_ms_(0)
136  , render_time_ms_(0)
137  , executor_(executor)
138  , lazy_fetch_info_(lazy_fetch_info)
139  , col_buffers_{col_buffers}
140  , frag_offsets_{frag_offsets}
141  , consistent_frag_sizes_{consistent_frag_sizes}
142  , estimator_buffer_(nullptr)
143  , host_estimator_buffer_(nullptr)
144  , data_mgr_(nullptr)
145  , separate_varlen_storage_valid_(false)
146  , just_explain_(false)
147  , cached_row_count_(-1)
148  , geo_return_type_(GeoReturnType::WktString) {}
149 
150 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
151  const ExecutorDeviceType device_type,
152  const int device_id,
153  Data_Namespace::DataMgr* data_mgr)
154  : device_type_(device_type)
155  , device_id_(device_id)
156  , query_mem_desc_{}
157  , crt_row_buff_idx_(0)
158  , estimator_(estimator)
159  , estimator_buffer_(nullptr)
160  , host_estimator_buffer_(nullptr)
161  , data_mgr_(data_mgr)
162  , separate_varlen_storage_valid_(false)
163  , just_explain_(false)
164  , cached_row_count_(-1)
165  , geo_return_type_(GeoReturnType::WktString) {
166  if (device_type == ExecutorDeviceType::GPU) {
167  estimator_buffer_ =
168  CudaAllocator::alloc(data_mgr_, estimator_->getBufferSize(), device_id_);
169  data_mgr->getCudaMgr()->zeroDeviceMem(
170  estimator_buffer_, estimator_->getBufferSize(), device_id_);
171  } else {
172  host_estimator_buffer_ =
173  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
174  }
175 }
176 
177 ResultSet::ResultSet(const std::string& explanation)
178  : device_type_(ExecutorDeviceType::CPU)
179  , device_id_(-1)
180  , fetched_so_far_(0)
181  , queue_time_ms_(0)
182  , render_time_ms_(0)
183  , estimator_buffer_(nullptr)
184  , host_estimator_buffer_(nullptr)
185  , separate_varlen_storage_valid_(false)
186  , explanation_(explanation)
187  , just_explain_(true)
188  , cached_row_count_(-1)
189  , geo_return_type_(GeoReturnType::WktString) {}
190 
191 ResultSet::ResultSet(int64_t queue_time_ms,
192  int64_t render_time_ms,
193  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
194  : device_type_(ExecutorDeviceType::CPU)
195  , device_id_(-1)
196  , fetched_so_far_(0)
197  , row_set_mem_owner_(row_set_mem_owner)
198  , queue_time_ms_(queue_time_ms)
199  , render_time_ms_(render_time_ms)
200  , estimator_buffer_(nullptr)
201  , host_estimator_buffer_(nullptr)
202  , separate_varlen_storage_valid_(false)
203  , just_explain_(true)
204  , cached_row_count_(-1)
205  , geo_return_type_(GeoReturnType::WktString){};
206 
208  if (storage_) {
209  CHECK(storage_->getUnderlyingBuffer());
210  if (!storage_->buff_is_provided_) {
211  free(storage_->getUnderlyingBuffer());
212  }
213  }
214  for (auto& storage : appended_storage_) {
215  if (storage && !storage->buff_is_provided_) {
216  free(storage->getUnderlyingBuffer());
217  }
218  }
219  if (host_estimator_buffer_) {
220  CHECK(device_type_ == ExecutorDeviceType::CPU || estimator_buffer_);
221  free(host_estimator_buffer_);
222  }
223 }
224 
226  return device_type_;
227 }
228 
230  CHECK(!storage_);
231  auto buff = static_cast<int8_t*>(
232  checked_malloc(query_mem_desc_.getBufferSizeBytes(device_type_)));
233  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, false));
234  return storage_.get();
235 }
236 
238  int8_t* buff,
239  const std::vector<int64_t>& target_init_vals) const {
240  CHECK(buff);
241  CHECK(!storage_);
242  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
243  storage_->target_init_vals_ = target_init_vals;
244  return storage_.get();
245 }
246 
248  const std::vector<int64_t>& target_init_vals) const {
249  CHECK(!storage_);
250  auto buff = static_cast<int8_t*>(
251  checked_malloc(query_mem_desc_.getBufferSizeBytes(device_type_)));
252  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, false));
253  storage_->target_init_vals_ = target_init_vals;
254  return storage_.get();
255 }
256 
258  if (crt_row_buff_idx_ == 0) {
259  throw std::runtime_error("current row buffer iteration index is undefined");
260  }
261  return crt_row_buff_idx_ - 1;
262 }
263 
264 void ResultSet::append(ResultSet& that) {
265  CHECK_EQ(-1, cached_row_count_);
266  if (!that.storage_) {
267  return;
268  }
269  appended_storage_.push_back(std::move(that.storage_));
270  query_mem_desc_.setEntryCount(
271  query_mem_desc_.getEntryCount() +
272  appended_storage_.back()->query_mem_desc_.getEntryCount());
273  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
274  col_buffers_.insert(
275  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
276  frag_offsets_.insert(
277  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
278  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
279  that.consistent_frag_sizes_.begin(),
280  that.consistent_frag_sizes_.end());
281  chunk_iters_.insert(
282  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
283  if (separate_varlen_storage_valid_) {
284  CHECK(that.separate_varlen_storage_valid_);
285  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
286  that.serialized_varlen_buffer_.begin(),
287  that.serialized_varlen_buffer_.end());
288  }
289  for (auto& buff : that.literal_buffers_) {
290  literal_buffers_.push_back(std::move(buff));
291  }
292 }
293 
295  return storage_.get();
296 }
297 
298 size_t ResultSet::colCount() const {
299  return just_explain_ ? 1 : targets_.size();
300 }
301 
302 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
303  if (just_explain_) {
304  return SQLTypeInfo(kTEXT, false);
305  }
306  CHECK_LT(col_idx, targets_.size());
307  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
308  : targets_[col_idx].sql_type;
309 }
310 
311 size_t ResultSet::rowCount(const bool force_parallel) const {
312  if (just_explain_) {
313  return 1;
314  }
315  if (!permutation_.empty()) {
316  return permutation_.size();
317  }
318  if (cached_row_count_ != -1) {
319  CHECK_GE(cached_row_count_, 0);
320  return cached_row_count_;
321  }
322  if (!storage_) {
323  return 0;
324  }
325  if (force_parallel || entryCount() > 20000) {
326  return parallelRowCount();
327  }
328  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
329  moveToBegin();
330  size_t row_count{0};
331  while (true) {
332  auto crt_row = getNextRowUnlocked(false, false);
333  if (crt_row.empty()) {
334  break;
335  }
336  ++row_count;
337  }
338  moveToBegin();
339  return row_count;
340 }
341 
342 void ResultSet::setCachedRowCount(const size_t row_count) const {
343  CHECK(cached_row_count_ == -1 || cached_row_count_ == static_cast<ssize_t>(row_count));
344  cached_row_count_ = row_count;
345 }
346 
348  size_t row_count{0};
349  const size_t worker_count = cpu_threads();
350  std::vector<std::future<size_t>> counter_threads;
351  for (size_t i = 0,
352  start_entry = 0,
353  stride = (entryCount() + worker_count - 1) / worker_count;
354  i < worker_count && start_entry < entryCount();
355  ++i, start_entry += stride) {
356  const auto end_entry = std::min(start_entry + stride, entryCount());
357  counter_threads.push_back(std::async(
358  std::launch::async,
359  [this](const size_t start, const size_t end) {
360  size_t row_count{0};
361  for (size_t i = start; i < end; ++i) {
362  if (!isRowAtEmpty(i)) {
363  ++row_count;
364  }
365  }
366  return row_count;
367  },
368  start_entry,
369  end_entry));
370  }
371  for (auto& child : counter_threads) {
372  child.wait();
373  }
374  for (auto& child : counter_threads) {
375  row_count += child.get();
376  }
377  if (keep_first_ + drop_first_) {
378  const auto limited_row_count = std::min(keep_first_ + drop_first_, row_count);
379  return limited_row_count < drop_first_ ? 0 : limited_row_count - drop_first_;
380  }
381  return row_count;
382 }
383 
385  return !storage_ && !estimator_ && !just_explain_;
386 }
387 
389  CHECK(storage_);
390  return storage_->query_mem_desc_;
391 }
392 
393 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
394  return targets_;
395 }
396 
397 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
398  CHECK(storage_);
399  return storage_->target_init_vals_;
400 }
401 
403  CHECK(device_type_ == ExecutorDeviceType::GPU);
404  return estimator_buffer_;
405 }
406 
408  return host_estimator_buffer_;
409 }
410 
412  CHECK(device_type_ == ExecutorDeviceType::GPU);
413  CHECK(!host_estimator_buffer_);
414  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
415  host_estimator_buffer_ =
416  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
417  copy_from_gpu(data_mgr_,
418  host_estimator_buffer_,
419  reinterpret_cast<CUdeviceptr>(estimator_buffer_),
420  estimator_->getBufferSize(),
421  device_id_);
422 }
423 
424 void ResultSet::setQueueTime(const int64_t queue_time) {
425  queue_time_ms_ = queue_time;
426 }
427 
428 int64_t ResultSet::getQueueTime() const {
429  return queue_time_ms_;
430 }
431 
432 int64_t ResultSet::getRenderTime() const {
433  return render_time_ms_;
434 }
435 
437  crt_row_buff_idx_ = 0;
438  fetched_so_far_ = 0;
439 }
440 
442  return keep_first_ + drop_first_;
443 }
444 
445 bool ResultSet::isExplain() const {
446  return just_explain_;
447 }
448 
450  return device_id_;
451 }
452 
455  auto query_mem_desc_copy = query_mem_desc;
456  query_mem_desc_copy.resetGroupColWidths(
457  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
458  if (query_mem_desc.didOutputColumnar()) {
459  return query_mem_desc_copy;
460  }
461  query_mem_desc_copy.alignPaddedSlots();
462  return query_mem_desc_copy;
463 }
464 
465 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
466  const size_t top_n) {
467  CHECK_EQ(-1, cached_row_count_);
468  CHECK(!targets_.empty());
469 #ifdef HAVE_CUDA
470  if (canUseFastBaselineSort(order_entries, top_n)) {
471  baselineSort(order_entries, top_n);
472  return;
473  }
474 #endif // HAVE_CUDA
475  if (query_mem_desc_.sortOnGpu()) {
476  try {
477  radixSortOnGpu(order_entries);
478  } catch (const OutOfMemory&) {
479  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
480  radixSortOnCpu(order_entries);
481  } catch (const std::bad_alloc&) {
482  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
483  radixSortOnCpu(order_entries);
484  }
485  return;
486  }
487  // This check isn't strictly required, but allows the index buffer to be 32-bit.
488  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
489  throw RowSortException("Sorting more than 4B elements not supported");
490  }
491 
492  CHECK(permutation_.empty());
493 
494  const bool use_heap{order_entries.size() == 1 && top_n};
495  if (use_heap && entryCount() > 100000) {
496  if (g_enable_watchdog && (entryCount() > 20000000)) {
497  throw WatchdogException("Sorting the result would be too slow");
498  }
499  parallelTop(order_entries, top_n);
500  return;
501  }
502 
503  if (g_enable_watchdog && (entryCount() > Executor::baseline_threshold)) {
504  throw WatchdogException("Sorting the result would be too slow");
505  }
506 
507  permutation_ = initPermutationBuffer(0, 1);
508 
509  auto compare = createComparator(order_entries, use_heap);
510 
511  if (use_heap) {
512  topPermutation(permutation_, top_n, compare);
513  } else {
514  sortPermutation(compare);
515  }
516 }
517 
518 #ifdef HAVE_CUDA
519 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
520  const size_t top_n) {
521  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
522  if (getGpuCount() > 1) {
523  try {
524  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n);
525  } catch (...) {
526  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n);
527  }
528  } else {
529  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n);
530  }
531 }
532 #endif // HAVE_CUDA
533 
534 std::vector<uint32_t> ResultSet::initPermutationBuffer(const size_t start,
535  const size_t step) {
536  CHECK_NE(size_t(0), step);
537  std::vector<uint32_t> permutation;
538  const auto total_entries = query_mem_desc_.getEntryCount();
539  permutation.reserve(total_entries / step);
540  for (size_t i = start; i < total_entries; i += step) {
541  const auto storage_lookup_result = findStorage(i);
542  const auto lhs_storage = storage_lookup_result.storage_ptr;
543  const auto off = storage_lookup_result.fixedup_entry_idx;
544  CHECK(lhs_storage);
545  if (!lhs_storage->isEmptyEntry(off)) {
546  permutation.push_back(i);
547  }
548  }
549  return permutation;
550 }
551 
552 const std::vector<uint32_t>& ResultSet::getPermutationBuffer() const {
553  return permutation_;
554 }
555 
556 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
557  const size_t top_n) {
558  const size_t step = cpu_threads();
559  std::vector<std::vector<uint32_t>> strided_permutations(step);
560  std::vector<std::future<void>> init_futures;
561  for (size_t start = 0; start < step; ++start) {
562  init_futures.emplace_back(
563  std::async(std::launch::async, [this, start, step, &strided_permutations] {
564  strided_permutations[start] = initPermutationBuffer(start, step);
565  }));
566  }
567  for (auto& init_future : init_futures) {
568  init_future.wait();
569  }
570  for (auto& init_future : init_futures) {
571  init_future.get();
572  }
573  auto compare = createComparator(order_entries, true);
574  std::vector<std::future<void>> top_futures;
575  for (auto& strided_permutation : strided_permutations) {
576  top_futures.emplace_back(
577  std::async(std::launch::async, [&strided_permutation, &compare, top_n] {
578  topPermutation(strided_permutation, top_n, compare);
579  }));
580  }
581  for (auto& top_future : top_futures) {
582  top_future.wait();
583  }
584  for (auto& top_future : top_futures) {
585  top_future.get();
586  }
587  permutation_.reserve(strided_permutations.size() * top_n);
588  for (const auto& strided_permutation : strided_permutations) {
589  permutation_.insert(
590  permutation_.end(), strided_permutation.begin(), strided_permutation.end());
591  }
592  topPermutation(permutation_, top_n, compare);
593 }
594 
595 std::pair<size_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
596  size_t fixedup_entry_idx = entry_idx;
597  auto entry_count = storage_->query_mem_desc_.getEntryCount();
598  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
599  if (fixedup_entry_idx < entry_count) {
600  return {0, fixedup_entry_idx};
601  }
602  fixedup_entry_idx -= entry_count;
603  for (size_t i = 0; i < appended_storage_.size(); ++i) {
604  const auto& desc = appended_storage_[i]->query_mem_desc_;
605  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
606  entry_count = desc.getEntryCount();
607  if (fixedup_entry_idx < entry_count) {
608  return {i + 1, fixedup_entry_idx};
609  }
610  fixedup_entry_idx -= entry_count;
611  }
612  UNREACHABLE() << "entry_idx = " << entry_idx << ", query_mem_desc_.getEntryCount() = "
613  << query_mem_desc_.getEntryCount();
614  return {};
615 }
616 
618  size_t stg_idx;
619  size_t fixedup_entry_idx;
620  std::tie(stg_idx, fixedup_entry_idx) = getStorageIndex(entry_idx);
621  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
622  fixedup_entry_idx,
623  stg_idx};
624 }
625 
626 template <typename BUFFER_ITERATOR_TYPE>
628  const uint32_t lhs,
629  const uint32_t rhs) const {
630  // NB: The compare function must define a strict weak ordering, otherwise
631  // std::sort will trigger a segmentation fault (or corrupt memory).
632  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
633  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
634  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
635  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
636  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
637  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
638  for (const auto order_entry : order_entries_) {
639  CHECK_GE(order_entry.tle_no, 1);
640  const auto& agg_info = result_set_->targets_[order_entry.tle_no - 1];
641  const auto entry_ti = get_compact_type(agg_info);
642  bool float_argument_input = takes_float_argument(agg_info);
643  // Need to determine if the float value has been stored as float
644  // or if it has been compacted to a different (often larger 8 bytes)
645  // in distributed case the floats are actually 4 bytes
646  // TODO the above takes_float_argument() is widely used wonder if this problem
647  // exists elsewhere
648  if (entry_ti.get_type() == kFLOAT) {
649  const auto is_col_lazy =
650  !result_set_->lazy_fetch_info_.empty() &&
651  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
652  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
653  sizeof(float)) {
654  float_argument_input =
655  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
656  }
657  }
658  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
659  fixedup_lhs,
660  order_entry.tle_no - 1,
661  lhs_storage_lookup_result);
662  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
663  fixedup_rhs,
664  order_entry.tle_no - 1,
665  rhs_storage_lookup_result);
666  if (UNLIKELY(isNull(entry_ti, lhs_v, float_argument_input) &&
667  isNull(entry_ti, rhs_v, float_argument_input))) {
668  return false;
669  }
670  if (UNLIKELY(isNull(entry_ti, lhs_v, float_argument_input) &&
671  !isNull(entry_ti, rhs_v, float_argument_input))) {
672  return use_heap_ ? !order_entry.nulls_first : order_entry.nulls_first;
673  }
674  if (UNLIKELY(isNull(entry_ti, rhs_v, float_argument_input) &&
675  !isNull(entry_ti, lhs_v, float_argument_input))) {
676  return use_heap_ ? order_entry.nulls_first : !order_entry.nulls_first;
677  }
678  const bool use_desc_cmp = use_heap_ ? !order_entry.is_desc : order_entry.is_desc;
679  if (LIKELY(lhs_v.isInt())) {
680  CHECK(rhs_v.isInt());
681  if (UNLIKELY(entry_ti.is_string() &&
682  entry_ti.get_compression() == kENCODING_DICT)) {
683  CHECK_EQ(4, entry_ti.get_logical_size());
684  const auto string_dict_proxy = result_set_->executor_->getStringDictionaryProxy(
685  entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
686  auto lhs_str = string_dict_proxy->getString(lhs_v.i1);
687  auto rhs_str = string_dict_proxy->getString(rhs_v.i1);
688  if (lhs_str == rhs_str) {
689  continue;
690  }
691  return use_desc_cmp ? lhs_str > rhs_str : lhs_str < rhs_str;
692  }
693  if (UNLIKELY(is_distinct_target(result_set_->targets_[order_entry.tle_no - 1]))) {
694  const auto lhs_sz = count_distinct_set_size(
695  lhs_v.i1,
696  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no -
697  1));
698  const auto rhs_sz = count_distinct_set_size(
699  rhs_v.i1,
700  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no -
701  1));
702  if (lhs_sz == rhs_sz) {
703  continue;
704  }
705  return use_desc_cmp ? lhs_sz > rhs_sz : lhs_sz < rhs_sz;
706  }
707  if (lhs_v.i1 == rhs_v.i1) {
708  continue;
709  }
710  if (entry_ti.is_fp()) {
711  if (float_argument_input) {
712  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
713  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
714  return use_desc_cmp ? lhs_dval > rhs_dval : lhs_dval < rhs_dval;
715  } else {
716  const auto lhs_dval =
717  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
718  const auto rhs_dval =
719  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
720  return use_desc_cmp ? lhs_dval > rhs_dval : lhs_dval < rhs_dval;
721  }
722  }
723  return use_desc_cmp ? lhs_v.i1 > rhs_v.i1 : lhs_v.i1 < rhs_v.i1;
724  } else {
725  if (lhs_v.isPair()) {
726  CHECK(rhs_v.isPair());
727  const auto lhs =
728  pair_to_double({lhs_v.i1, lhs_v.i2}, entry_ti, float_argument_input);
729  const auto rhs =
730  pair_to_double({rhs_v.i1, rhs_v.i2}, entry_ti, float_argument_input);
731  if (lhs == rhs) {
732  continue;
733  }
734  return use_desc_cmp ? lhs > rhs : lhs < rhs;
735  } else {
736  CHECK(lhs_v.isStr() && rhs_v.isStr());
737  const auto lhs = lhs_v.strVal();
738  const auto rhs = rhs_v.strVal();
739  if (lhs == rhs) {
740  continue;
741  }
742  return use_desc_cmp ? lhs > rhs : lhs < rhs;
743  }
744  }
745  }
746  return false;
747 }
748 
750  std::vector<uint32_t>& to_sort,
751  const size_t n,
752  const std::function<bool(const uint32_t, const uint32_t)> compare) {
753  std::make_heap(to_sort.begin(), to_sort.end(), compare);
754  std::vector<uint32_t> permutation_top;
755  permutation_top.reserve(n);
756  for (size_t i = 0; i < n && !to_sort.empty(); ++i) {
757  permutation_top.push_back(to_sort.front());
758  std::pop_heap(to_sort.begin(), to_sort.end(), compare);
759  to_sort.pop_back();
760  }
761  to_sort.swap(permutation_top);
762 }
763 
765  const std::function<bool(const uint32_t, const uint32_t)> compare) {
766  std::sort(permutation_.begin(), permutation_.end(), compare);
767 }
768 
770  const std::list<Analyzer::OrderEntry>& order_entries) const {
771  auto data_mgr = &executor_->catalog_->getDataMgr();
772  const int device_id{0};
773  CudaAllocator cuda_allocator(data_mgr, device_id);
774  std::vector<int64_t*> group_by_buffers(executor_->blockSize());
775  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
776  auto dev_group_by_buffers =
777  create_dev_group_by_buffers(&cuda_allocator,
778  group_by_buffers,
779  query_mem_desc_,
780  executor_->blockSize(),
781  executor_->gridSize(),
782  device_id,
784  -1,
785  true,
786  true,
787  false,
788  nullptr);
790  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
792  data_mgr,
793  group_by_buffers,
794  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
795  dev_group_by_buffers.second,
796  query_mem_desc_,
797  executor_->blockSize(),
798  executor_->gridSize(),
799  device_id,
800  false);
801 }
802 
804  const std::list<Analyzer::OrderEntry>& order_entries) const {
805  CHECK(!query_mem_desc_.hasKeylessHash());
806  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
807  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
808  CHECK_EQ(size_t(1), order_entries.size());
809  auto buffer_ptr = storage_->getUnderlyingBuffer();
810  for (const auto& order_entry : order_entries) {
811  const auto target_idx = order_entry.tle_no - 1;
812  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
813  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
814  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
815  sort_groups_cpu(sortkey_val_buff,
816  &idx_buff[0],
817  query_mem_desc_.getEntryCount(),
818  order_entry.is_desc,
819  chosen_bytes);
820  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
821  &idx_buff[0],
822  query_mem_desc_.getEntryCount(),
823  &tmp_buff[0],
824  sizeof(int64_t));
825  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
826  ++target_idx) {
827  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
828  continue;
829  }
830  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
831  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
832  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
833  apply_permutation_cpu(satellite_val_buff,
834  &idx_buff[0],
835  query_mem_desc_.getEntryCount(),
836  &tmp_buff[0],
837  chosen_bytes);
838  }
839  }
840 }
841 
843  const int64_t ptr) {
844  const auto it_ok = count_distinct_sets_mapping_.emplace(remote_ptr, ptr);
845  CHECK(it_ok.second);
846 }
847 
848 int64_t ResultSetStorage::mappedPtr(const int64_t remote_ptr) const {
849  const auto it = count_distinct_sets_mapping_.find(remote_ptr);
850  // Due to the removal of completely zero bitmaps in a distributed transfer there will be
851  // remote ptr that do not not exists. Return 0 if no pointer found
852  if (it == count_distinct_sets_mapping_.end()) {
853  return int64_t(0);
854  }
855  return it->second;
856 }
857 
859  return keep_first_;
860 }
861 
862 std::shared_ptr<const std::vector<std::string>> ResultSet::getStringDictionaryPayloadCopy(
863  const int dict_id) const {
864  CHECK(executor_);
865  const auto sdp =
866  executor_->getStringDictionaryProxy(dict_id, row_set_mem_owner_, false);
867  return sdp->getDictionary()->copyStrings();
868 }
869 
870 bool can_use_parallel_algorithms(const ResultSet& rows) {
871  return !rows.isTruncated();
872 }
873 
874 bool use_parallel_algorithms(const ResultSet& rows) {
875  return can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
876 }
877 
887  return false;
888  } else if (query_mem_desc_.didOutputColumnar()) {
889  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
891  (query_mem_desc_.getQueryDescriptionType() ==
893  query_mem_desc_.getQueryDescriptionType() ==
895  } else {
896  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
898  query_mem_desc_.getQueryDescriptionType() ==
900  }
901 }
902 
903 // returns a bitmap (and total number) of all single slot targets
904 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
905  std::vector<bool> target_bitmap(targets_.size(), true);
906  size_t num_single_slot_targets = 0;
907  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
908  const auto& sql_type = targets_[target_idx].sql_type;
909  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
910  target_bitmap[target_idx] = false;
911  } else if (sql_type.is_varlen()) {
912  target_bitmap[target_idx] = false;
913  } else {
914  num_single_slot_targets++;
915  }
916  }
917  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
918 }
919 
928 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
929  const {
930  CHECK(isDirectColumnarConversionPossible());
931  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
932 
933  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
934  const auto& target = targets_[target_idx];
935  if (single_slot_targets[target_idx] &&
936  (is_distinct_target(target) ||
937  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
938  single_slot_targets[target_idx] = false;
939  num_single_slot_targets--;
940  }
941  }
942  CHECK_GE(num_single_slot_targets, size_t(0));
943  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
944 }
945 
946 // returns the starting slot index for all targets in the result set
947 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
948  std::vector<size_t> slot_indices(targets_.size(), 0);
949  size_t slot_index = 0;
950  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
951  slot_indices[target_idx] = slot_index;
952  slot_index = advance_slot(slot_index, targets_[target_idx], false);
953  }
954  return slot_indices;
955 }
bool is_agg(const Analyzer::Expr *expr)
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:411
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:388
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:595
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:874
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:203
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:207
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *cuda_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:61
bool g_enable_direct_columnarization
Definition: Execute.cpp:101
ExecutorDeviceType
void moveToBegin() const
Definition: ResultSet.cpp:436
#define LOG(tag)
Definition: Logger.h:188
static const size_t baseline_threshold
Definition: Execute.h:948
#define UNREACHABLE()
Definition: Logger.h:241
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:393
void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr)
Definition: ResultSet.cpp:842
#define CHECK_GE(x, y)
Definition: Logger.h:210
ResultSetStorage(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc, int8_t *buff, const bool buff_is_provided)
Definition: ResultSet.cpp:44
size_t rowCount(const bool force_parallel=false) const
Definition: ResultSet.cpp:311
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:81
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:121
size_t colCount() const
Definition: ResultSet.cpp:298
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
bool isTruncated() const
Definition: ResultSet.cpp:441
size_t getLimit()
Definition: ResultSet.cpp:858
std::unordered_map< int64_t, int64_t > count_distinct_sets_mapping_
Definition: ResultSet.h:213
bool g_enable_watchdog
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:77
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Executor *executor)
Definition: ResultSet.cpp:91
size_t parallelRowCount() const
Definition: ResultSet.cpp:347
std::vector< uint32_t > initPermutationBuffer(const size_t start, const size_t step)
Definition: ResultSet.cpp:534
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:803
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:384
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:407
static int8_t * alloc(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
const ResultSetStorage * allocateStorage() const
std::shared_ptr< const std::vector< std::string > > getStringDictionaryPayloadCopy(const int dict_id) const
Definition: ResultSet.cpp:862
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
CHECK(cgen_state)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:424
#define CHECK_NE(x, y)
Definition: Logger.h:206
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:86
void copy_from_gpu(Data_Namespace::DataMgr *data_mgr, void *dst, const CUdeviceptr src, const size_t num_bytes, const int device_id)
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:904
const int32_t groups_buffer_size return nullptr
#define LIKELY(x)
Definition: likely.h:19
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:617
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:117
static void topPermutation(std::vector< uint32_t > &to_sort, const size_t n, const std::function< bool(const uint32_t, const uint32_t)> compare)
Definition: ResultSet.cpp:749
bool g_enable_smem_group_by true
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:852
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:769
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:294
int64_t getQueueTime() const
Definition: ResultSet.cpp:428
#define UNLIKELY(x)
Definition: likely.h:20
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:55
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:302
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:928
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:225
bool isExplain() const
Definition: ResultSet.cpp:445
int8_t * buff_
Definition: ResultSet.h:205
void sortPermutation(const std::function< bool(const uint32_t, const uint32_t)> compare)
Definition: ResultSet.cpp:764
void copy_group_by_buffers_from_gpu(Data_Namespace::DataMgr *data_mgr, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const CUdeviceptr group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer)
int64_t mappedPtr(const int64_t) const
Definition: ResultSet.cpp:848
Definition: sqldefs.h:76
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:453
void append(ResultSet &that)
Definition: ResultSet.cpp:264
bool operator()(const uint32_t lhs, const uint32_t rhs) const
Definition: ResultSet.cpp:627
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:870
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:257
bool g_enable_watchdog false
Definition: Execute.cpp:71
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:556
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:402
Basic constructors and methods of the row set interface.
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:397
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:947
const std::vector< uint32_t > & getPermutationBuffer() const
Definition: ResultSet.cpp:552
Allocate GPU memory using GpuBuffers via DataMgr.
void sort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n)
Definition: ResultSet.cpp:465
int cpu_threads()
Definition: thread_count.h:25
Definition: sqldefs.h:72
int64_t getRenderTime() const
Definition: ResultSet.cpp:432
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:342
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:885
int getDeviceId() const
Definition: ResultSet.cpp:449