31 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
33 const size_t thread_idx,
34 const size_t executor_id,
39 std::vector<SQLTypeInfo> col_types;
40 for (
size_t i = 0; i < result->colCount(); ++i) {
44 row_set_mem_owner, *result, result->colCount(), col_types, executor_id, thread_idx);
48 switch (memoryLevel) {
62 : executor_(executor), columnarized_table_cache_(column_cache) {}
74 const size_t thread_idx,
75 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
77 static std::mutex columnar_conversion_mutex;
83 const auto& catalog = *executor->getCatalog();
86 CHECK(!cd || !(cd->isVirtualCol));
87 const int8_t* col_buff =
nullptr;
96 ChunkKey chunk_key{catalog.getCurrentDB().dbId,
102 &catalog.getDataMgr(),
106 chunk_meta_it->second->numBytes,
107 chunk_meta_it->second->numElements);
108 chunks_owner.push_back(chunk);
110 auto ab = chunk->getBuffer();
111 CHECK(ab->getMemoryPtr());
112 col_buff =
reinterpret_cast<int8_t*
>(ab->getMemoryPtr());
116 std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex);
118 if (column_cache.empty() || !column_cache.count(table_id)) {
119 column_cache.insert(std::make_pair(
120 table_id, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
122 auto& frag_id_to_result = column_cache[table_id];
123 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
124 frag_id_to_result.insert(
125 std::make_pair(frag_id,
127 executor->row_set_mem_owner_,
129 executor->executor_id_,
133 col_frag = column_cache[table_id][frag_id].get();
138 &catalog.getDataMgr(),
158 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
162 const size_t thread_idx,
163 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
164 std::vector<std::shared_ptr<void>>& malloc_owner,
166 CHECK(!fragments.empty());
168 size_t col_chunks_buff_sz =
sizeof(
struct JoinChunk) * fragments.size();
170 auto col_chunks_buff =
reinterpret_cast<int8_t*
>(
171 malloc_owner.emplace_back(
checked_malloc(col_chunks_buff_sz), free).get());
172 auto join_chunk_array =
reinterpret_cast<struct
JoinChunk*
>(col_chunks_buff);
175 size_t num_chunks = 0;
176 for (
auto& frag : fragments) {
178 executor->checkNonKernelTimeInterrupted()) {
192 num_elems += elem_count;
203 return {col_chunks_buff,
207 static_cast<size_t>(elem_sz)};
214 const std::map<int, const TableFragments*>& all_tables_fragments,
215 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
216 std::list<ChunkIter>& chunk_iter_holder,
220 const auto fragments_it = all_tables_fragments.find(table_id);
221 CHECK(fragments_it != all_tables_fragments.end());
222 const auto fragments = fragments_it->second;
223 const auto& fragment = (*fragments)[frag_id];
224 if (fragment.isEmptyPhysicalFragment()) {
227 std::shared_ptr<Chunk_NS::Chunk> chunk;
228 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
229 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
234 const auto col_type =
236 const bool is_real_string =
237 col_type.is_string() && col_type.get_compression() ==
kENCODING_NONE;
238 const bool is_varlen =
243 cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
244 std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
254 chunk_meta_it->second->numBytes,
255 chunk_meta_it->second->numElements);
257 chunk_holder.push_back(chunk);
261 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
262 chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
263 auto& chunk_iter = chunk_iter_holder.back();
265 return reinterpret_cast<int8_t*
>(&chunk_iter);
267 auto ab = chunk->getBuffer();
269 auto& row_set_mem_owner =
executor_->getRowSetMemoryOwner();
270 row_set_mem_owner->addVarlenInputBuffer(ab);
275 chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter),
sizeof(
ChunkIter));
276 return chunk_iter_gpu;
279 auto ab = chunk->getBuffer();
280 CHECK(ab->getMemoryPtr());
281 return ab->getMemoryPtr();
288 const std::map<int, const TableFragments*>& all_tables_fragments,
292 const size_t thread_idx)
const {
293 const auto fragments_it = all_tables_fragments.find(table_id);
294 CHECK(fragments_it != all_tables_fragments.end());
295 const auto fragments = fragments_it->second;
296 const auto frag_count = fragments->size();
297 std::vector<std::unique_ptr<ColumnarResults>> column_frags;
305 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
307 executor_->checkNonKernelTimeInterrupted()) {
310 std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
311 std::list<ChunkIter> chunk_iter_holder;
312 const auto& fragment = (*fragments)[frag_id];
313 if (fragment.isEmptyPhysicalFragment()) {
316 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
317 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
319 static_cast<int>(frag_id),
321 all_tables_fragments,
327 column_frags.push_back(
328 std::make_unique<ColumnarResults>(
executor_->row_set_mem_owner_,
330 fragment.getNumTuples(),
331 chunk_meta_it->second->sqlType,
335 auto merged_results =
337 table_column = merged_results.get();
340 table_column = column_it->second.get();
356 const size_t thread_idx)
const {
371 const std::map<int, const TableFragments*>& all_tables_fragments,
372 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
373 std::list<ChunkIter>& chunk_iter_holder,
377 const size_t thread_idx)
const {
379 const auto fragments_it = all_tables_fragments.find(table_id);
380 CHECK(fragments_it != all_tables_fragments.end());
381 const auto fragments = fragments_it->second;
382 const auto frag_count = fragments->size();
389 bool is_varlen_chunk = cd->columnType.is_varlen() && !cd->columnType.is_fixlen_array();
390 size_t total_num_tuples = 0;
391 size_t total_data_buf_size = 0;
392 size_t total_idx_buf_size = 0;
406 if (linearized_iter_it->second.find(device_id) !=
407 linearized_iter_it->second.end()) {
414 return chunk_iter_gpu;
423 std::shared_ptr<Chunk_NS::Chunk> chunk;
424 std::list<std::shared_ptr<Chunk_NS::Chunk>> local_chunk_holder;
425 std::list<ChunkIter> local_chunk_iter_holder;
426 std::list<size_t> local_chunk_num_tuples;
429 for (
size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
430 const auto& fragment = (*fragments)[frag_id];
431 if (fragment.isEmptyPhysicalFragment()) {
434 auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
435 CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
437 cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
443 chunk_meta_it->second->numBytes,
444 chunk_meta_it->second->numElements);
445 local_chunk_holder.push_back(chunk);
446 auto chunk_iter = chunk->begin_iterator(chunk_meta_it->second);
447 local_chunk_iter_holder.push_back(chunk_iter);
448 local_chunk_num_tuples.push_back(fragment.getNumTuples());
449 total_num_tuples += fragment.getNumTuples();
450 total_data_buf_size += chunk->getBuffer()->size();
451 std::ostringstream oss;
452 oss <<
"Load chunk for col_name: " << chunk->getColumnDesc()->columnName
453 <<
", col_id: " << chunk->getColumnDesc()->columnId <<
", Frag-" << frag_id
454 <<
", numTuples: " << fragment.getNumTuples()
455 <<
", data_size: " << chunk->getBuffer()->size();
456 if (chunk->getIndexBuf()) {
457 auto idx_buf_size = chunk->getIndexBuf()->size() -
sizeof(
ArrayOffsetT);
458 oss <<
", index_size: " << idx_buf_size;
459 total_idx_buf_size += idx_buf_size;
461 VLOG(2) << oss.str();
465 auto& col_ti = cd->columnType;
475 if (col_ti.is_array()) {
476 if (col_ti.is_fixlen_array()) {
477 VLOG(2) <<
"Linearize fixed-length multi-frag array column (col_id: "
478 << cd->columnId <<
", col_name: " << cd->columnName
480 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
485 local_chunk_iter_holder,
486 local_chunk_num_tuples,
496 CHECK(col_ti.is_varlen_array());
497 VLOG(2) <<
"Linearize variable-length multi-frag array column (col_id: "
498 << cd->columnId <<
", col_name: " << cd->columnName
500 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
505 local_chunk_iter_holder,
506 local_chunk_num_tuples,
517 if (col_ti.is_string() && !col_ti.is_dict_encoded_string()) {
518 VLOG(2) <<
"Linearize variable-length multi-frag non-encoded text column (col_id: "
519 << cd->columnId <<
", col_name: " << cd->columnName
521 <<
", device_id: " << device_id <<
"): " << cd->columnType.to_string();
526 local_chunk_iter_holder,
527 local_chunk_num_tuples,
539 if (!col_ti.is_fixlen_array()) {
542 auto merged_data_buffer =
res.first;
543 auto merged_index_buffer =
res.second;
546 auto merged_chunk = std::make_shared<Chunk_NS::Chunk>(
547 merged_data_buffer, merged_index_buffer, cd,
false);
552 *(local_chunk_iter_holder.rbegin()),
557 chunk_holder.push_back(merged_chunk);
558 chunk_iter_holder.push_back(merged_chunk_iter);
561 auto merged_chunk_iter_ptr =
reinterpret_cast<int8_t*
>(&(chunk_iter_holder.back()));
564 return merged_chunk_iter_ptr;
567 CHECK(device_allocator);
574 chunk_iter_gpu, merged_chunk_iter_ptr,
sizeof(
ChunkIter));
575 return chunk_iter_gpu;
581 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
582 std::list<ChunkIter>& chunk_iter_holder,
583 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
584 std::list<ChunkIter>& local_chunk_iter_holder,
585 std::list<size_t>& local_chunk_num_tuples,
589 const size_t total_data_buf_size,
590 const size_t total_idx_buf_size,
591 const size_t total_num_tuples,
593 const size_t thread_idx)
const {
605 bool has_cached_merged_idx_buf =
false;
606 bool has_cached_merged_data_buf =
false;
610 int64_t linearization_time_ms = 0;
616 auto& cd_cache = cached_data_buf_cache_it->second;
617 auto cached_data_buf_it = cd_cache.find(device_id);
618 if (cached_data_buf_it != cd_cache.end()) {
619 has_cached_merged_data_buf =
true;
620 merged_data_buffer = cached_data_buf_it->second;
621 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
627 VLOG(2) <<
"Allocate " << total_data_buf_size
628 <<
" bytes of data buffer space for linearized chunks (memory_level: "
631 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
637 VLOG(2) <<
"Allocate " << total_data_buf_size
638 <<
" bytes of data buffer space for linearized chunks (memory_level: "
641 m.insert(std::make_pair(device_id, merged_data_buffer));
645 auto cached_index_buf_it =
648 has_cached_merged_idx_buf =
true;
649 merged_index_buffer_in_cpu = cached_index_buf_it->second;
651 <<
"Recycle merged temporary idx buffer for linearized chunks (memory_level: "
654 auto idx_buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
655 merged_index_buffer_in_cpu =
657 VLOG(2) <<
"Allocate " << idx_buf_size
658 <<
" bytes of temporary idx buffer space on CPU for linearized chunks";
661 std::make_pair(cd->
columnId, merged_index_buffer_in_cpu));
666 size_t sum_data_buf_size = 0;
667 size_t cur_sum_num_tuples = 0;
668 size_t total_idx_size_modifier = 0;
669 auto chunk_holder_it = local_chunk_holder.begin();
670 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
671 auto chunk_num_tuple_it = local_chunk_num_tuples.begin();
672 bool null_padded_first_elem =
false;
673 bool null_padded_last_val =
false;
680 for (; chunk_holder_it != local_chunk_holder.end();
681 chunk_holder_it++, chunk_num_tuple_it++) {
683 auto target_chunk = chunk_holder_it->get();
684 auto target_chunk_data_buffer = target_chunk->getBuffer();
685 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
686 auto target_idx_buf_ptr =
687 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
688 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
690 size_t cur_idx = cur_chunk_num_tuples;
692 while (original_offset < 0) {
693 original_offset = target_idx_buf_ptr[--cur_idx];
695 ArrayOffsetT new_offset = original_offset + sum_data_buf_size;
696 if (new_offset < 0) {
697 throw std::runtime_error(
698 "Linearization of a variable-length column having chunk size larger than 2GB "
699 "not supported yet");
701 sum_data_buf_size += target_chunk_data_buffer->size();
703 chunk_holder_it = local_chunk_holder.begin();
704 chunk_num_tuple_it = local_chunk_num_tuples.begin();
705 sum_data_buf_size = 0;
707 for (; chunk_holder_it != local_chunk_holder.end();
708 chunk_holder_it++, chunk_iter_holder_it++, chunk_num_tuple_it++) {
710 executor_->checkNonKernelTimeInterrupted()) {
713 auto target_chunk = chunk_holder_it->get();
714 auto target_chunk_data_buffer = target_chunk->getBuffer();
715 auto cur_chunk_num_tuples = *chunk_num_tuple_it;
716 auto target_chunk_idx_buffer = target_chunk->getIndexBuf();
717 auto target_idx_buf_ptr =
718 reinterpret_cast<ArrayOffsetT*
>(target_chunk_idx_buffer->getMemoryPtr());
719 auto idx_buf_size = target_chunk_idx_buffer->size() -
sizeof(
ArrayOffsetT);
720 auto target_data_buffer_start_ptr = target_chunk_data_buffer->getMemoryPtr();
721 auto target_data_buffer_size = target_chunk_data_buffer->size();
728 if (cur_sum_num_tuples > 0 && target_idx_buf_ptr[0] > 0) {
729 null_padded_first_elem =
true;
735 if (!has_cached_merged_data_buf) {
736 merged_data_buffer->
append(target_data_buffer_start_ptr,
737 target_data_buffer_size,
742 if (!has_cached_merged_idx_buf) {
744 merged_index_buffer_in_cpu->
append(target_chunk_idx_buffer->getMemoryPtr(),
752 if (cur_sum_num_tuples > 0) {
753 if (null_padded_last_val) {
756 idx_buf_ptr[cur_sum_num_tuples] = -sum_data_buf_size;
759 std::vector<std::future<void>> conversion_threads;
760 std::vector<std::vector<size_t>> null_padded_row_idx_vecs(worker_count,
761 std::vector<size_t>());
762 bool is_parallel_modification =
false;
763 std::vector<size_t> null_padded_row_idx_vec;
764 const auto do_work = [&cur_sum_num_tuples,
766 &null_padded_first_elem,
770 const bool is_parallel_modification,
771 std::vector<size_t>* null_padded_row_idx_vec) {
772 for (
size_t i = start; i < end; i++) {
773 if (
LIKELY(idx_buf_ptr[cur_sum_num_tuples + i] >= 0)) {
774 if (null_padded_first_elem) {
776 idx_buf_ptr[cur_sum_num_tuples + i] -=
779 idx_buf_ptr[cur_sum_num_tuples + i] += sum_data_buf_size;
786 null_padded_row_idx_vec->push_back(cur_sum_num_tuples + i);
791 is_parallel_modification =
true;
793 makeIntervals(
size_t(0), cur_chunk_num_tuples, worker_count)) {
794 conversion_threads.push_back(
799 is_parallel_modification,
800 &null_padded_row_idx_vecs[interval.index]));
802 for (
auto& child : conversion_threads) {
805 for (
auto& v : null_padded_row_idx_vecs) {
806 std::copy(v.begin(), v.end(), std::back_inserter(null_padded_row_idx_vec));
810 cur_chunk_num_tuples,
811 is_parallel_modification,
812 &null_padded_row_idx_vec);
814 if (!null_padded_row_idx_vec.empty()) {
817 std::sort(null_padded_row_idx_vec.begin(), null_padded_row_idx_vec.end());
818 for (
auto& padded_null_row_idx : null_padded_row_idx_vec) {
819 if (idx_buf_ptr[padded_null_row_idx - 1] > 0) {
820 idx_buf_ptr[padded_null_row_idx] = -idx_buf_ptr[padded_null_row_idx - 1];
822 idx_buf_ptr[padded_null_row_idx] = idx_buf_ptr[padded_null_row_idx - 1];
828 cur_sum_num_tuples += cur_chunk_num_tuples;
829 sum_data_buf_size += target_chunk_data_buffer->size();
830 if (target_idx_buf_ptr[*chunk_num_tuple_it] < 0) {
831 null_padded_last_val =
true;
833 null_padded_last_val =
false;
835 if (null_padded_first_elem) {
837 null_padded_first_elem =
false;
839 if (!has_cached_merged_idx_buf && cur_sum_num_tuples == total_num_tuples) {
840 auto merged_index_buffer_ptr =
842 merged_index_buffer_ptr[total_num_tuples] =
843 total_data_buf_size -
844 total_idx_size_modifier;
850 size_t buf_size = total_idx_buf_size +
sizeof(
ArrayOffsetT);
853 int8_t* src, int8_t*
dest,
size_t buf_size,
MemoryLevel memory_level) {
855 memcpy((
void*)dest, src, buf_size);
868 auto& merged_idx_buf_cache = merged_idx_buf_cache_it->second;
869 auto merged_idx_buf_it = merged_idx_buf_cache.find(device_id);
870 if (merged_idx_buf_it != merged_idx_buf_cache.end()) {
871 merged_index_buffer = merged_idx_buf_it->second;
873 merged_index_buffer = cat.
getDataMgr().
alloc(memory_level, device_id, buf_size);
878 merged_idx_buf_cache.insert(std::make_pair(device_id, merged_index_buffer));
881 merged_index_buffer = cat.
getDataMgr().
alloc(memory_level, device_id, buf_size);
887 m.insert(std::make_pair(device_id, merged_index_buffer));
892 merged_index_buffer = merged_index_buffer_in_cpu;
895 CHECK(merged_index_buffer);
896 linearization_time_ms +=
timer_stop(clock_begin);
897 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
898 << linearization_time_ms <<
" ms.";
899 return {merged_data_buffer, merged_index_buffer};
904 std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
905 std::list<ChunkIter>& chunk_iter_holder,
906 std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
907 std::list<ChunkIter>& local_chunk_iter_holder,
908 std::list<size_t>& local_chunk_num_tuples,
912 const size_t total_data_buf_size,
913 const size_t total_idx_buf_size,
914 const size_t total_num_tuples,
916 const size_t thread_idx)
const {
917 int64_t linearization_time_ms = 0;
921 bool has_cached_merged_data_buf =
false;
927 auto& cd_cache = cached_data_buf_cache_it->second;
928 auto cached_data_buf_it = cd_cache.find(device_id);
929 if (cached_data_buf_it != cd_cache.end()) {
930 has_cached_merged_data_buf =
true;
931 merged_data_buffer = cached_data_buf_it->second;
932 VLOG(2) <<
"Recycle merged data buffer for linearized chunks (memory_level: "
938 VLOG(2) <<
"Allocate " << total_data_buf_size
939 <<
" bytes of data buffer space for linearized chunks (memory_level: "
942 cd_cache.insert(std::make_pair(device_id, merged_data_buffer));
948 VLOG(2) <<
"Allocate " << total_data_buf_size
949 <<
" bytes of data buffer space for linearized chunks (memory_level: "
952 m.insert(std::make_pair(device_id, merged_data_buffer));
956 if (!has_cached_merged_data_buf) {
957 size_t sum_data_buf_size = 0;
958 auto chunk_holder_it = local_chunk_holder.begin();
959 auto chunk_iter_holder_it = local_chunk_iter_holder.begin();
960 for (; chunk_holder_it != local_chunk_holder.end();
961 chunk_holder_it++, chunk_iter_holder_it++) {
965 auto target_chunk = chunk_holder_it->get();
966 auto target_chunk_data_buffer = target_chunk->getBuffer();
967 merged_data_buffer->
append(target_chunk_data_buffer->getMemoryPtr(),
968 target_chunk_data_buffer->size(),
971 sum_data_buf_size += target_chunk_data_buffer->size();
974 CHECK_EQ(total_data_buf_size, sum_data_buf_size);
976 linearization_time_ms +=
timer_stop(clock_begin);
977 VLOG(2) <<
"Linearization has been successfully done, elapsed time: "
978 << linearization_time_ms <<
" ms.";
979 return {merged_data_buffer,
nullptr};
989 if (!columnar_results) {
993 CHECK_LT(static_cast<size_t>(col_id), col_buffers.size());
995 const auto& col_ti = columnar_results->
getColumnType(col_id);
996 const auto num_bytes = columnar_results->
size() * col_ti.get_size();
997 CHECK(device_allocator);
998 auto gpu_col_buffer = device_allocator->
alloc(num_bytes);
999 device_allocator->
copyToDevice(gpu_col_buffer, col_buffers[col_id], num_bytes);
1000 return gpu_col_buffer;
1002 return col_buffers[col_id];
1006 const int device_id,
1007 int8_t* chunk_iter_ptr)
const {
1011 auto iter_device_it = chunk_iter_it->second.find(device_id);
1012 if (iter_device_it == chunk_iter_it->second.end()) {
1013 VLOG(2) <<
"Additional merged chunk_iter for col_desc (tbl: "
1015 <<
"), device_id: " << device_id;
1016 chunk_iter_it->second.emplace(device_id, chunk_iter_ptr);
1020 iter_m.emplace(device_id, chunk_iter_ptr);
1021 VLOG(2) <<
"New merged chunk_iter for col_desc (tbl: "
1023 <<
"), device_id: " << device_id;
1029 const int device_id)
const {
1032 auto dev_iter_map_it = linearized_chunk_iter_it->second.find(device_id);
1033 if (dev_iter_map_it != linearized_chunk_iter_it->second.end()) {
1034 VLOG(2) <<
"Recycle merged chunk_iter for col_desc (tbl: "
1036 <<
"), device_id: " << device_id;
1037 return dev_iter_map_it->second;
1046 bool is_true_varlen_type,
1047 const size_t total_num_tuples)
const {
1049 if (is_true_varlen_type) {
1061 merged_chunk_iter.
num_elems = total_num_tuples;
1062 merged_chunk_iter.
skip = chunk_iter.
skip;
1065 return merged_chunk_iter;
1071 auto& data_mgr =
cat.getDataMgr();
1075 for (
auto& kv2 : kv.second) {
1076 data_mgr.free(kv2.second);
1083 for (
auto& kv2 : kv.second) {
1084 data_mgr.free(kv2.second);
1093 auto& data_mgr =
cat.getDataMgr();
1096 data_mgr.free(kv.second);
1106 const int device_id,
1108 const size_t thread_idx)
const {
1114 table_id, std::unordered_map<
int, std::shared_ptr<const ColumnarResults>>()));
1118 if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
1119 frag_id_to_result.insert(
1120 std::make_pair(frag_id,
1121 std::shared_ptr<const ColumnarResults>(
1133 result, col_id,
executor_->getDataMgr(), memory_level, device_id, device_allocator);
size_t getNumTuples() const
std::vector< int > ChunkKey
HOST DEVICE int get_size() const
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
static const int32_t ERR_INTERRUPTED
class for a per-database catalog. also includes metadata for the current database and the current use...
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::mutex varlen_chunk_fetch_mutex_
Data_Namespace::DataMgr & getDataMgr() const
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
virtual int8_t * getMemoryPtr()=0
std::mutex columnar_fetch_mutex_
DEVICE void sort(ARGS &&...args)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
TypeR::rep timer_stop(Type clock_begin)
virtual int8_t * alloc(const size_t num_bytes)=0
ColumnCacheMap columnarized_table_cache_
Constants for Builtin SQL Types supported by OmniSci.
const int8_t * linearizeColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::shared_ptr< ResultSet > ResultSetPtr
bool isEmptyPhysicalFragment() const
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
bool g_enable_non_kernel_time_query_interrupt
std::mutex linearization_mutex_
const ColumnarResults * columnarize_result(std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSetPtr &result, const size_t thread_idx, const size_t executor_id, const int frag_id)
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getOneTableColumnFragment(const int table_id, const int frag_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
__device__ bool check_interrupt()
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
const ResultSetPtr & get_temporary_table(const TemporaryTables *temporary_tables, const int table_id)
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
virtual void copyToDevice(void *device_dst, const void *host_src, const size_t num_bytes) const =0
future< Result > async(Fn &&fn, Args &&...args)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
#define INJECT_TIMER(DESC)
const size_t size() const
MergedChunk linearizeVarLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
MergedChunk linearizeFixedLenArrayColFrags(const Catalog_Namespace::Catalog &cat, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
const ChunkMetadataMap & getChunkMetadataMap() const
std::mutex chunk_list_mutex_
const SQLTypeInfo & get_type_info() const
void freeTemporaryCpuLinearizedIdxBuf()
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk's pointer and element count on either CPU or GPU.
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define DEBUG_TIMER(name)
std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel)
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
int get_column_id() const
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
const std::vector< int8_t * > & getColumnBuffers() const
size_t g_enable_parallel_linearization
const int8_t * getAllTableColumnFragments(const int table_id, const int col_id, const std::map< int, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
const SQLTypeInfo & getColumnType(const int col_id) const
AbstractBuffer * alloc(const MemoryLevel memoryLevel, const int deviceId, const size_t numBytes)