31 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
33 const size_t thread_idx,
34 const size_t executor_id,
39 std::vector<SQLTypeInfo> col_types;
40 for (
size_t i = 0; i < result->colCount(); ++i) {
44 row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
48 switch (memoryLevel) {
62 :
executor_(executor), columnarized_table_cache_(column_cache) {}
74 const size_t thread_idx,
75 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
77 static std::mutex columnar_conversion_mutex;
84 CHECK(!cd || !(cd->isVirtualCol));
85 const int8_t* col_buff =
nullptr;
100 executor->getDataMgr(),
104 chunk_meta_it->second->numBytes,
105 chunk_meta_it->second->numElements);
106 chunks_owner.push_back(chunk);
108 auto ab = chunk->getBuffer();
109 CHECK(ab->getMemoryPtr());
110 col_buff =
reinterpret_cast<int8_t*
>(ab->getMemoryPtr());
114 std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
117 if (column_cache.empty() || !column_cache.count(table_key)) {
118 column_cache.insert(std::make_pair(
120 std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
122 auto& frag_id_to_result = column_cache[table_key];
123 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124 frag_id_to_result.insert(std::make_pair(
127 executor->row_set_mem_owner_,
129 executor->executor_id_,
133 col_frag = column_cache[table_key][frag_id].get();
138 executor->getDataMgr(),
158 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
162 const size_t thread_idx,
163 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
164 std::vector<std::shared_ptr<void>>& malloc_owner,
166 CHECK(!fragments.empty());
168 size_t col_chunks_buff_sz =
sizeof(
struct JoinChunk) * fragments.size();
170 auto col_chunks_buff =
reinterpret_cast<int8_t*
>(
171 malloc_owner.emplace_back(
checked_malloc(col_chunks_buff_sz), free).get());
172 auto join_chunk_array =
reinterpret_cast<struct
JoinChunk*
>(col_chunks_buff);
175 size_t num_chunks = 0;
176 for (
auto& frag : fragments) {
178 executor->checkNonKernelTimeInterrupted()) {
192 num_elems += elem_count;
203 return {col_chunks_buff,
207 static_cast<size_t>(elem_sz)};
214 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
215 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
216 std::list<ChunkIter>& chunk_iter_holder,
220 const auto fragments_it = all_tables_fragments.find(table_key);
221 CHECK(fragments_it != all_tables_fragments.end());
222 const auto fragments = fragments_it->second;
223 const auto& fragment = (*fragments)[frag_id];
224 if (fragment.isEmptyPhysicalFragment()) {
227 std::shared_ptr<Chunk_NS::Chunk> chunk;
228 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
233 const auto col_type =
235 const bool is_real_string =
236 col_type.is_string() && col_type.get_compression() ==
kENCODING_NONE;
237 const bool is_varlen =
242 table_key.
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
243 std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
253 chunk_meta_it->second->numBytes,
254 chunk_meta_it->second->numElements);
256 chunk_holder.push_back(chunk);
260 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
261 chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
262 auto& chunk_iter = chunk_iter_holder.back();
264 return reinterpret_cast<int8_t*
>(&chunk_iter);
266 auto ab = chunk->getBuffer();
268 auto& row_set_mem_owner =
executor_->getRowSetMemoryOwner();
269 row_set_mem_owner->addVarlenInputBuffer(ab);
274 chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter),
sizeof(
ChunkIter));
275 return chunk_iter_gpu;
278 auto ab = chunk->getBuffer();
279 CHECK(ab->getMemoryPtr());
280 return ab->getMemoryPtr();
287 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
291 const size_t thread_idx)
const {
292 const auto fragments_it = all_tables_fragments.find(table_key);
293 CHECK(fragments_it != all_tables_fragments.end());
294 const auto fragments = fragments_it->second;
295 const auto frag_count = fragments->size();
296 std::vector<std::unique_ptr<ColumnarResults>> column_frags;
304 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
306 executor_->checkNonKernelTimeInterrupted()) {
309 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
310 std::list<ChunkIter> chunk_iter_holder;
311 const auto& fragment = (*fragments)[frag_id];
312 if (fragment.isEmptyPhysicalFragment()) {
315 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
316 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
318 static_cast<int>(frag_id),
320 all_tables_fragments,
326 column_frags.push_back(
327 std::make_unique<ColumnarResults>(
executor_->row_set_mem_owner_,
329 fragment.getNumTuples(),
330 chunk_meta_it->second->sqlType,
334 auto merged_results =
336 table_column = merged_results.get();
339 table_column = column_it->second.get();
355 const size_t thread_idx)
const {
371 const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
372 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
373 std::list<ChunkIter>& chunk_iter_holder,
377 const size_t thread_idx)
const {
379 const auto fragments_it = all_tables_fragments.find(table_key);
380 CHECK(fragments_it != all_tables_fragments.end());
381 const auto fragments = fragments_it->second;
382 const auto frag_count = fragments->size();
388 bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
389 size_t total_num_tuples = 0;
390 size_t total_data_buf_size = 0;
391 size_t total_idx_buf_size = 0;
405 if (linearized_iter_it->second.find(device_id) !=
406 linearized_iter_it->second.end()) {
413 return chunk_iter_gpu;
422 std::shared_ptr<Chunk_NS::Chunk> chunk;
423 std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
424 std::list<ChunkIter> local_chunk_iter_holder;
425 std::list<size_t> local_chunk_num_tuples;
428 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
429 const auto& fragment = (*fragments)[frag_id];
430 if (fragment.isEmptyPhysicalFragment()) {
433 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
434 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
436 table_key.
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
442 chunk_meta_it->second->numBytes,
443 chunk_meta_it->second->numElements);
444 local_chunk_holder.push_back(chunk);
445 auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
446 local_chunk_iter_holder.push_back(chunk_iter);
447 local_chunk_num_tuples.push_back(fragment.getNumTuples());
448 total_num_tuples += fragment.getNumTuples();
449 total_data_buf_size += chunk->getBuffer()->size();
450 std::ostringstream oss;
451 oss <<
"Load chunk for col_name: " << chunk->getColumnDesc()->columnName
452 <<
", col_id: " << chunk->getColumnDesc()->columnId <<
", Frag-" << frag_id
453 <<
", numTuples: " << fragment.getNumTuples()
454 <<
", data_size: " << chunk->getBuffer()->size();
455 if (chunk->getIndexBuf()) {
456 auto idx_buf_size = chunk->getIndexBuf()->size() -
sizeof(
ArrayOffsetT);
457 oss <<
", index_size: " << idx_buf_size;
458 total_idx_buf_size += idx_buf_size;
460 VLOG(2) << oss.str();
464 auto& col_ti = cd->columnType;
474 if (col_ti.is_array()) {
475 if (col_ti.is_fixlen_array()) {
476 VLOG(2) <<
"Linearize fixed-length multi-frag array column (col_id: "
477 << cd->columnId <<
", col_name: " << cd->columnName
479 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
484 local_chunk_iter_holder,
485 local_chunk_num_tuples,
495 CHECK(col_ti.is_varlen_array());
496 VLOG(2) <<
"Linearize variable-length multi-frag array column (col_id: "
497 << cd->columnId <<
", col_name: " << cd->columnName
499 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
504 local_chunk_iter_holder,
505 local_chunk_num_tuples,
516 if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
517 VLOG(2) <<
"Linearize variable-length multi-frag non-encoded text column (col_id: "
518 << cd->columnId <<
", col_name: " << cd->columnName
520 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
525 local_chunk_iter_holder,
526 local_chunk_num_tuples,
538 if (!col_ti.is_fixlen_array()) {
541 auto merged_data_buffer =
res.first;
542 auto merged_index_buffer =
res.second;
545 auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
546 merged_data_buffer, merged_index_buffer, cd,
false);
551 *(local_chunk_iter_holder.rbegin()),
556 chunk_holder.push_back(merged_chunk);
557 chunk_iter_holder.push_back(merged_chunk_iter);
560 auto merged_chunk_iter_ptr =
reinterpret_cast<int8_t*
>(&(chunk_iter_holder.back()));
563 return merged_chunk_iter_ptr;
566 CHECK(device_allocator);
573 chunk_iter_gpu, merged_chunk_iter_ptr,
sizeof(
ChunkIter));
574 return chunk_iter_gpu;
580 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
581 std::list<ChunkIter>& chunk_iter_holder,
582 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
583 std::list<ChunkIter>& local_chunk_iter_holder,
584 std::list<size_t>& local_chunk_num_tuples,
588 const size_t total_data_buf_size,
589 const size_t total_idx_buf_size,
590 const size_t total_num_tuples,
592 const size_t thread_idx)
const {
604 bool has_cached_merged_idx_buf =
false;
605 bool has_cached_merged_data_buf =
false;
609 int64_t linearization_time_ms = 0;
615 auto& cd_cache = cached_data_buf_cache_it->second;
616 auto cached_data_buf_it = cd_cache.find(device_id);
617 if (cached_data_buf_it != cd_cache.end()) {
618 has_cached_merged_data_buf =
true;
619 merged_data_buffer = cached_data_buf_it->second;
620 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
625 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
626 VLOG(2) <<
"Allocate " << total_data_buf_size
627 <<
" bytes of data buffer space for linearized chunks (memory_level: "
630 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
635 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
636 VLOG(2) <<
"Allocate " << total_data_buf_size
637 <<
" bytes of data buffer space for linearized chunks (memory_level: "
640 m.insert(std::make_pair(device_id, merged_data_buffer));
644 auto cached_index_buf_it =
647 has_cached_merged_idx_buf =
true;
648 merged_index_buffer_in_cpu = cached_index_buf_it->second;
650 <<
"Recycle merged temporary idx buffer for linearized chunks (memory_level: "
653 auto idx_buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
654 merged_index_buffer_in_cpu =
656 VLOG(2) <<
"Allocate " << idx_buf_size
657 <<
" bytes of temporary idx buffer space on CPU for linearized chunks";
660 std::make_pair(cd->
columnId, merged_index_buffer_in_cpu));
665 size_t sum_data_buf_size = 0;
666 size_t cur_sum_num_tuples = 0;
667 size_t total_idx_size_modifier = 0;
668 auto chunk_holder_it = local_chunk_holder.begin();
669 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
670 auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
671 bool null_padded_first_elem =
false;
672 bool null_padded_last_val =
false;
679 for (; chunk_holder_it != local_chunk_holder.end();
680 chunk_holder_it++, chunk_num_tuple_it++) {
682 auto target_chunk = chunk_holder_it->get();
683 auto target_chunk_data_buffer = target_chunk->getBuffer();
684 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
685 auto target_idx_buf_ptr =
686 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
687 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
689 size_t cur_idx = cur_chunk_num_tuples;
691 while (original_offset < 0) {
692 original_offset = target_idx_buf_ptr[--cur_idx];
694 ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
695 if (new_offset < 0) {
696 throw std::runtime_error(
697 "Linearization of a variable-length column having chunk size larger than 2GB "
698 "not supported yet");
700 sum_data_buf_size += target_chunk_data_buffer->size();
702 chunk_holder_it = local_chunk_holder.begin();
703 chunk_num_tuple_it = local_chunk_num_tuples.begin();
704 sum_data_buf_size = 0;
706 for (; chunk_holder_it != local_chunk_holder.end();
707 chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
709 executor_->checkNonKernelTimeInterrupted()) {
712 auto target_chunk = chunk_holder_it->get();
713 auto target_chunk_data_buffer = target_chunk->getBuffer();
714 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
715 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
716 auto target_idx_buf_ptr =
717 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
718 auto idx_buf_size = target_chunk_idx_buffer->size() -
sizeof(
ArrayOffsetT);
719 auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
720 auto target_data_buffer_size = target_chunk_data_buffer->size();
727 if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
728 null_padded_first_elem =
true;
734 if (!has_cached_merged_data_buf) {
735 merged_data_buffer->
append(target_data_buffer_start_ptr,
736 target_data_buffer_size,
741 if (!has_cached_merged_idx_buf) {
743 merged_index_buffer_in_cpu->
append(target_chunk_idx_buffer->getMemoryPtr(),
751 if (cur_sum_num_tuples > 0) {
752 if (null_padded_last_val) {
755 idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
758 std::vector<std::future<void>> conversion_threads;
759 std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
760 std::vector<size_t>());
761 bool is_parallel_modification =
false;
762 std::vector<size_t> null_padded_row_idx_vec;
763 const auto do_work = [&cur_sum_num_tuples,
765 &null_padded_first_elem,
769 const bool is_parallel_modification,
770 std::vector<size_t>* null_padded_row_idx_vec) {
771 for (
size_t i = start; i < end; i++) {
772 if (
LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
773 if (null_padded_first_elem) {
775 idx_buf_ptr[cur_sum_num_tuples + i] -=
778 idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
785 null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
790 is_parallel_modification =
true;
792 makeIntervals(
size_t(0), cur_chunk_num_tuples, worker_count)) {
793 conversion_threads.push_back(
798 is_parallel_modification,
799 &null_padded_row_idx_vecs[interval.index]));
801 for (
auto& child : conversion_threads) {
804 for (
auto& v : null_padded_row_idx_vecs) {
805 std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
809 cur_chunk_num_tuples,
810 is_parallel_modification,
811 &null_padded_row_idx_vec);
813 if (!null_padded_row_idx_vec.empty()) {
816 std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
817 for (
auto& padded_null_row_idx : null_padded_row_idx_vec) {
818 if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
819 idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
821 idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
827 cur_sum_num_tuples += cur_chunk_num_tuples;
828 sum_data_buf_size += target_chunk_data_buffer->size();
829 if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
830 null_padded_last_val =
true;
832 null_padded_last_val =
false;
834 if (null_padded_first_elem) {
836 null_padded_first_elem =
false;
838 if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
839 auto merged_index_buffer_ptr =
841 merged_index_buffer_ptr[total_num_tuples] =
842 total_data_buf_size -
843 total_idx_size_modifier;
849 size_t buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
852 int8_t* src, int8_t*
dest,
size_t buf_size,
MemoryLevel memory_level) {
854 memcpy((
void*)dest, src, buf_size);
867 auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
868 auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
869 if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
870 merged_index_buffer = merged_idx_buf_it->second;
872 merged_index_buffer =
873 executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
878 merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
881 merged_index_buffer =
882 executor_->getDataMgr()->alloc(memory_level, device_id, buf_size);
888 m.insert(std::make_pair(device_id, merged_index_buffer));
893 merged_index_buffer = merged_index_buffer_in_cpu;
896 CHECK(merged_index_buffer);
897 linearization_time_ms +=
timer_stop(clock_begin);
898 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
899 << linearization_time_ms <<
" ms.";
900 return {merged_data_buffer, merged_index_buffer};
905 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
906 std::list<ChunkIter>& chunk_iter_holder,
907 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
908 std::list<ChunkIter>& local_chunk_iter_holder,
909 std::list<size_t>& local_chunk_num_tuples,
913 const size_t total_data_buf_size,
914 const size_t total_idx_buf_size,
915 const size_t total_num_tuples,
917 const size_t thread_idx)
const {
918 int64_t linearization_time_ms = 0;
922 bool has_cached_merged_data_buf =
false;
928 auto& cd_cache = cached_data_buf_cache_it->second;
929 auto cached_data_buf_it = cd_cache.find(device_id);
930 if (cached_data_buf_it != cd_cache.end()) {
931 has_cached_merged_data_buf =
true;
932 merged_data_buffer = cached_data_buf_it->second;
933 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
938 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
939 VLOG(2) <<
"Allocate " << total_data_buf_size
940 <<
" bytes of data buffer space for linearized chunks (memory_level: "
943 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
948 executor_->getDataMgr()->alloc(memory_level, device_id, total_data_buf_size);
949 VLOG(2) <<
"Allocate " << total_data_buf_size
950 <<
" bytes of data buffer space for linearized chunks (memory_level: "
953 m.insert(std::make_pair(device_id, merged_data_buffer));
957 if (!has_cached_merged_data_buf) {
958 size_t sum_data_buf_size = 0;
959 auto chunk_holder_it = local_chunk_holder.begin();
960 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
961 for (; chunk_holder_it != local_chunk_holder.end();
962 chunk_holder_it++, chunk_iter_holder_it++) {
966 auto target_chunk = chunk_holder_it->get();
967 auto target_chunk_data_buffer = target_chunk->getBuffer();
968 merged_data_buffer->
append(target_chunk_data_buffer->getMemoryPtr(),
969 target_chunk_data_buffer->size(),
972 sum_data_buf_size += target_chunk_data_buffer->size();
975 CHECK_EQ(total_data_buf_size, sum_data_buf_size);
977 linearization_time_ms +=
timer_stop(clock_begin);
978 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
979 << linearization_time_ms <<
" ms.";
980 return {merged_data_buffer,
nullptr};
990 if (!columnar_results) {
994 CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
996 const auto& col_ti = columnar_results->
getColumnType(col_id);
1001 num_bytes = columnar_results->
size() * col_ti.get_size();
1003 CHECK(device_allocator);
1004 auto gpu_col_buffer = device_allocator->
alloc(num_bytes);
1005 device_allocator->
copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1006 return gpu_col_buffer;
1008 return col_buffers[col_id];
1012 const int device_id,
1013 int8_t* chunk_iter_ptr)
const {
1017 auto iter_device_it = chunk_iter_it->second.find(device_id);
1018 if (iter_device_it == chunk_iter_it->second.end()) {
1019 VLOG(2) <<
"Additional merged chunk_iter for col_desc (tbl: "
1021 <<
"), device_id: " << device_id;
1022 chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1026 iter_m.emplace(device_id, chunk_iter_ptr);
1027 VLOG(2) <<
"New merged chunk_iter for col_desc (tbl: "
1029 <<
"), device_id: " << device_id;
1035 const int device_id)
const {
1038 auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1039 if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1040 VLOG(2) <<
"Recycle merged chunk_iter for col_desc (tbl: "
1042 <<
"), device_id: " << device_id;
1043 return dev_iter_map_it->second;
1052 bool is_true_varlen_type,
1053 const size_t total_num_tuples)
const {
1055 if (is_true_varlen_type) {
1067 merged_chunk_iter.
num_elems = total_num_tuples;
1068 merged_chunk_iter.
skip = chunk_iter.
skip;
1071 return merged_chunk_iter;
1077 const auto data_mgr =
executor_->getDataMgr();
1081 for (
auto& kv2 : kv.second) {
1082 data_mgr->free(kv2.second);
1089 for (
auto& kv2 : kv.second) {
1090 data_mgr->free(kv2.second);
1099 const auto data_mgr =
executor_->getDataMgr();
1102 data_mgr->free(kv.second);
1112 const int device_id,
1114 const size_t thread_idx)
const {
1121 table_key, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
1125 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1126 frag_id_to_result.insert(
1127 std::make_pair(frag_id,
1128 std::shared_ptr<const ColumnarResults>(
1140 result, col_id,
executor_->getDataMgr(), memory_level, device_id, device_allocator);
size_t getNumTuples() const
std::vector< int > ChunkKey
HOST DEVICE int get_size() const
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
TypeR::rep timer_stop(Type clock_begin)
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
Constants for Builtin SQL Types supported by HEAVY.AI.
std::shared_ptr< ResultSet > ResultSetPtr
bool isEmptyPhysicalFragment() const
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
bool g_enable_non_kernel_time_query_interrupt
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
std::mutex linearization_mutex_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
#define INJECT_TIMER(DESC)
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
const size_t size() const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
const ColumnDescriptor * get_column_descriptor(const shared::ColumnKey &column_key)
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
MergedChunk linearizeFixedLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
void freeTemporaryCpuLinearizedIdxBuf()
const shared::ColumnKey & getColumnKey() const
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
MergedChunk linearizeVarLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define DEBUG_TIMER(name)
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
HOST static DEVICE bool isFlatBuffer(const void *buffer)
const std::vector< int8_t * > & getColumnBuffers() const
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
size_t g_enable_parallel_linearization
const SQLTypeInfo & getColumnType(const int col_id) const
static int64_t getBufferSize(const void *buffer)