20 #include "../Fragmenter/Fragmenter.h"
34 return table_info_copy;
38 const std::vector<const TableDescriptor*>& shard_tables) {
39 size_t total_number_of_tuples{0};
42 CHECK(shard_table->fragmenter);
43 const auto& shard_metainfo = shard_table->fragmenter->getFragmentsForQuery();
44 total_number_of_tuples += shard_metainfo.getPhysicalNumTuples();
46 shard_metainfo.fragments.size());
48 shard_metainfo.fragments.begin(),
49 shard_metainfo.fragments.end());
52 return table_info_all_shards;
58 const auto it =
cache_.find(table_id);
60 const auto& table_info = it->second;
65 const auto td =
cat->getMetadataForTable(table_id);
67 const auto shard_tables =
cat->getPhysicalTablesDescriptors(td);
87 std::vector<Fragmenter_Namespace::FragmentInfo>
result;
90 auto& fragment = result.front();
91 fragment.fragmentId = 0;
92 fragment.deviceIds.resize(3);
93 fragment.resultSet = rows.get();
94 fragment.resultSetMutex.reset(
new std::mutex());
102 const std::vector<InputDescriptor>& input_descs,
103 Executor* executor) {
104 const auto temporary_tables = executor->getTemporaryTables();
105 const auto cat = executor->getCatalog();
107 std::unordered_map<int, size_t> info_cache;
108 for (
const auto& input_desc : input_descs) {
109 const auto table_id = input_desc.getTableId();
110 const auto cached_index_it = info_cache.find(table_id);
111 if (cached_index_it != info_cache.end()) {
112 CHECK_LT(cached_index_it->second, table_infos.size());
113 table_infos.push_back(
114 {table_id,
copy_table_info(table_infos[cached_index_it->second].info)});
119 CHECK(temporary_tables);
120 const auto it = temporary_tables->find(table_id);
122 <<
"Failed to find previous query result for node " << -table_id;
126 table_infos.push_back({table_id, executor->getTableInfo(table_id)});
128 CHECK(!table_infos.empty());
129 info_cache.insert(std::make_pair(table_id, table_infos.size() - 1));
137 std::vector<std::vector<std::unique_ptr<Encoder>>> dummy_encoders;
138 const size_t worker_count =
140 for (
size_t worker_idx = 0; worker_idx < worker_count; ++worker_idx) {
141 dummy_encoders.emplace_back();
142 for (
size_t i = 0; i < rows->colCount(); ++i) {
143 const auto& col_ti = rows->getColType(i);
147 const auto do_work = [rows](
const std::vector<TargetValue>& crt_row,
148 std::vector<std::unique_ptr<Encoder>>& dummy_encoders) {
149 for (
size_t i = 0; i < rows->colCount(); ++i) {
150 const auto& col_ti = rows->getColType(i);
151 const auto& col_val = crt_row[i];
152 const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
153 CHECK(scalar_col_val);
155 const auto i64_p = boost::get<int64_t>(scalar_col_val);
158 }
else if (col_ti.is_fp()) {
159 switch (col_ti.get_type()) {
161 const auto float_p = boost::get<float>(scalar_col_val);
163 dummy_encoders[i]->updateStats(*float_p,
168 const auto double_p = boost::get<double>(scalar_col_val);
170 dummy_encoders[i]->updateStats(*double_p,
178 throw std::runtime_error(col_ti.get_type_name() +
179 " is not supported in temporary table.");
185 std::vector<std::future<void>> compute_stats_threads;
186 const auto entry_count = rows->entryCount();
189 stride = (entry_count + worker_count - 1) / worker_count;
190 i < worker_count && start_entry < entry_count;
191 ++i, start_entry += stride) {
192 const auto end_entry = std::min(start_entry + stride, entry_count);
193 compute_stats_threads.push_back(std::async(
195 [rows, &do_work, &dummy_encoders](
196 const size_t start,
const size_t end,
const size_t worker_idx) {
197 for (
size_t i = start; i < end; ++i) {
198 const auto crt_row = rows->getRowAtNoTranslations(i);
199 if (!crt_row.empty()) {
200 do_work(crt_row, dummy_encoders[worker_idx]);
208 for (
auto& child : compute_stats_threads) {
211 for (
auto& child : compute_stats_threads) {
216 auto crt_row = rows->getNextRow(
false,
false);
217 if (crt_row.empty()) {
220 do_work(crt_row, dummy_encoders[0]);
225 for (
size_t worker_idx = 1; worker_idx < worker_count; ++worker_idx) {
226 CHECK_LT(worker_idx, dummy_encoders.size());
227 const auto& worker_encoders = dummy_encoders[worker_idx];
228 for (
size_t i = 0; i < rows->colCount(); ++i) {
229 dummy_encoders[0][i]->reduceStats(*worker_encoders[i]);
232 for (
size_t i = 0; i < rows->colCount(); ++i) {
234 metadata_map.emplace(i, dummy_encoders[0][i]->getMetadata(rows->getColType(i)));
241 const auto temporary_tables = executor->getTemporaryTables();
242 CHECK(temporary_tables);
243 auto it = temporary_tables->find(table_id);
244 if (it != temporary_tables->end()) {
248 const auto table_info = executor->getTableInfo(table_id);
249 return table_info.fragments.size();
254 const std::vector<InputDescriptor>& input_descs,
255 Executor* executor) {
256 std::vector<InputTableInfo> table_infos;
262 Executor* executor) {
264 std::vector<InputTableInfo> table_infos;
278 std::unique_ptr<std::lock_guard<std::mutex>> lock;
279 if (resultSetMutex) {
280 lock.reset(
new std::lock_guard<std::mutex>(*resultSetMutex));
282 CHECK_EQ(!!resultSet, !!resultSetMutex);
283 if (resultSet && !synthesizedNumTuplesIsValid) {
284 numTuples = resultSet->rowCount();
285 synthesizedNumTuplesIsValid =
true;
291 if (!fragments.empty() && fragments.front().resultSet) {
292 return fragments.front().getNumTuples();
298 if (!fragments.empty() && fragments.front().resultSet) {
299 return fragments.front().resultSet->entryCount();
305 if (!fragments.empty() && fragments.front().resultSet) {
306 return fragments.front().resultSet->entryCount();
308 size_t fragment_num_tupples_upper_bound = 0;
309 for (
const auto& fragment : fragments) {
310 fragment_num_tupples_upper_bound =
311 std::max(fragment.getNumTuples(), fragment_num_tupples_upper_bound);
313 return fragment_num_tupples_upper_bound;
size_t getNumTuples() const
static Encoder * Create(Data_Namespace::AbstractBuffer *buffer, const SQLTypeInfo sqlType)
std::vector< InputDescriptor > input_descs
std::shared_ptr< ResultSet > ResultSetPtr
std::vector< FragmentInfo > fragments
std::vector< int > chunkKeyPrefix
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
#define LOG_IF(severity, condition)
ChunkMetadataMap chunkMetadataMap
bool use_parallel_algorithms(const ResultSet &rows)
size_t getPhysicalNumTuples() const
size_t getNumTuples() const
#define INJECT_TIMER(DESC)
size_t getFragmentNumTuplesUpperBound() const
const ChunkMetadataMap & getChunkMetadataMap() const
HOST DEVICE EncodingType get_compression() const
size_t getNumTuplesUpperBound() const
void setPhysicalNumTuples(const size_t physNumTuples)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
bool synthesizedMetadataIsValid
DEVICE void swap(ARGS &&...args)