OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <queue>
20 
21 #include <arrow/filesystem/localfs.h>
22 #include <boost/filesystem.hpp>
23 
25 #include "FsiChunkUtils.h"
26 #include "FsiJsonUtils.h"
27 #include "LazyParquetChunkLoader.h"
28 #include "ParquetShared.h"
29 #include "Shared/file_path_util.h"
30 #include "Shared/misc.h"
31 #include "Utils/DdlUtils.h"
32 
33 namespace foreign_storage {
34 
35 namespace {
36 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
37  std::shared_ptr<ChunkMetadata> reduce_from) {
38  CHECK(reduce_to->sqlType == reduce_from->sqlType);
39  reduce_to->numBytes += reduce_from->numBytes;
40  reduce_to->numElements += reduce_from->numElements;
41  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
42 
43  auto column_type = reduce_to->sqlType;
44  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
45 
46  // metadata reducution is done at metadata scan time, both string & geometry
47  // columns have no valid stats to reduce beyond `has_nulls`
48  if (column_type.is_string() || column_type.is_geometry()) {
49  // Reset to invalid range, as formerly valid metadata
50  // needs to be invalidated during an append for these types
51  reduce_to->chunkStats.max = reduce_from->chunkStats.max;
52  reduce_to->chunkStats.min = reduce_from->chunkStats.min;
53  return;
54  }
55 
56  ForeignStorageBuffer buffer_to;
57  buffer_to.initEncoder(column_type);
58  auto encoder_to = buffer_to.getEncoder();
59  encoder_to->resetChunkStats(reduce_to->chunkStats);
60 
61  ForeignStorageBuffer buffer_from;
62  buffer_from.initEncoder(column_type);
63  auto encoder_from = buffer_from.getEncoder();
64  encoder_from->resetChunkStats(reduce_from->chunkStats);
65 
66  encoder_to->reduceStats(*encoder_from);
67  auto updated_metadata = std::make_shared<ChunkMetadata>();
68  encoder_to->getMetadata(updated_metadata);
69  reduce_to->chunkStats = updated_metadata->chunkStats;
70 }
71 } // namespace
72 
73 ParquetDataWrapper::ParquetDataWrapper() : db_id_(-1), foreign_table_(nullptr) {}
74 
75 ParquetDataWrapper::ParquetDataWrapper(const int db_id, const ForeignTable* foreign_table)
76  : db_id_(db_id)
77  , foreign_table_(foreign_table)
78  , last_fragment_index_(0)
79  , last_fragment_row_count_(0)
80  , total_row_count_(0)
81  , last_row_group_(0)
82  , is_restored_(false)
83  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
84  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
85  auto& server_options = foreign_table->foreign_server->options;
86  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
87  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
88  } else {
89  UNREACHABLE();
90  }
91 }
92 
96 
97  last_row_group_ = 0;
100  total_row_count_ = 0;
101  file_reader_cache_->clear();
102 }
103 
104 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
105  const Interval<ColumnType>& column_interval) {
107  CHECK(catalog);
108  const auto& columns = schema_->getLogicalAndPhysicalColumns();
109  auto column_start = column_interval.start;
110  auto column_end = column_interval.end;
111  std::list<const ColumnDescriptor*> columns_to_init;
112  for (const auto column : columns) {
113  auto column_id = column->columnId;
114  if (column_id >= column_start && column_id <= column_end) {
115  columns_to_init.push_back(column);
116  }
117  }
118  return columns_to_init;
119 }
120 
122  const int fragment_index,
123  const Interval<ColumnType>& column_interval,
124  const ChunkToBufferMap& required_buffers,
125  const bool reserve_buffers_and_set_stats) {
126  for (const auto column : getColumnsToInitialize(column_interval)) {
127  Chunk_NS::Chunk chunk{column};
128  ChunkKey data_chunk_key;
129  if (column->columnType.is_varlen_indeed()) {
130  data_chunk_key = {
131  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
132  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
133  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
134  chunk.setBuffer(data_buffer);
135 
136  ChunkKey index_chunk_key{
137  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
138  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
139  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
140  chunk.setIndexBuffer(index_buffer);
141  } else {
142  data_chunk_key = {
143  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
144  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
145  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
146  chunk.setBuffer(data_buffer);
147  }
148  chunk.initEncoder();
149  if (reserve_buffers_and_set_stats) {
150  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
151  CHECK(metadata_it != chunk_metadata_map_.end());
152  auto buffer = chunk.getBuffer();
153  auto& metadata = metadata_it->second;
154  auto encoder = buffer->getEncoder();
155  encoder->resetChunkStats(metadata->chunkStats);
156  encoder->setNumElems(metadata->numElements);
157  if (column->columnType.is_string() &&
158  column->columnType.get_compression() == kENCODING_NONE) {
159  auto index_buffer = chunk.getIndexBuf();
160  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
161  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
162  auto index_buffer = chunk.getIndexBuf();
163  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
164  } else {
165  size_t num_bytes_to_reserve =
166  metadata->numElements * column->columnType.get_size();
167  buffer->reserve(num_bytes_to_reserve);
168  }
169  }
170  }
171 }
172 
176 }
177 
178 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
179  const auto last_fragment_entry =
181  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
182 
183  last_fragment_entry->second.back().end_index = last_row_group_;
187  RowGroupInterval{file_path, row_group});
188 }
189 
190 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
191  const auto last_fragment_entry =
193  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
194 
195  // The entry for the first fragment starts out as an empty vector
196  if (last_fragment_entry->second.empty()) {
198  return true;
199  } else {
200  return (last_fragment_entry->second.back().file_path != file_path);
201  }
202 }
203 
204 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
205  const auto last_fragment_entry =
207  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
208 
209  // The entry for the first fragment starts out as an empty vector
210  if (last_fragment_entry->second.empty()) {
212  } else {
213  last_fragment_entry->second.back().end_index = last_row_group_;
214  }
215  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
216 }
217 
220  CHECK(catalog);
221  std::vector<std::string> new_file_paths;
222  auto processed_file_paths = getProcessedFilePaths();
223  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
224  auto all_file_paths = getAllFilePaths();
225  for (const auto& file_path : processed_file_paths) {
226  if (!shared::contains(all_file_paths, file_path)) {
227  throw_removed_file_error(file_path);
228  }
229  }
230 
231  for (const auto& file_path : all_file_paths) {
232  if (!shared::contains(processed_file_paths, file_path)) {
233  new_file_paths.emplace_back(file_path);
234  }
235  }
236 
237  // Single file append
238  // If an append occurs with multiple files, then we assume any existing files have
239  // not been altered. If an append occurs on a single file, then we check to see if
240  // it has changed.
241  if (new_file_paths.empty() && all_file_paths.size() == 1) {
242  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
243  const auto& file_path = *all_file_paths.begin();
244  CHECK_EQ(*processed_file_paths.begin(), file_path);
245 
246  // Since an existing file is being appended to we need to update the cached
247  // FileReader as the existing one will be out of date.
248  auto reader = file_reader_cache_->insert(file_path, file_system_);
249  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
250 
251  if (row_count < total_row_count_) {
252  throw_removed_row_error(file_path);
253  } else if (row_count > total_row_count_) {
254  new_file_paths = all_file_paths;
255  chunk_metadata_map_.clear();
257  }
258  }
259  } else {
260  CHECK(chunk_metadata_map_.empty());
261  new_file_paths = getAllFilePaths();
263  }
264 
265  if (!new_file_paths.empty()) {
266  metadataScanFiles(new_file_paths);
267  }
268 }
269 
271  std::set<std::string> file_paths;
272  for (const auto& entry : fragment_to_row_group_interval_map_) {
273  for (const auto& row_group_interval : entry.second) {
274  file_paths.emplace(row_group_interval.file_path);
275  }
276  }
277  return file_paths;
278 }
279 
280 std::vector<std::string> ParquetDataWrapper::getAllFilePaths() {
281  auto timer = DEBUG_TIMER(__func__);
282  std::vector<std::string> found_file_paths;
283  auto file_path = getFullFilePath(foreign_table_);
284  const auto& regex_pattern = foreign_table_->getOption(REGEX_PATH_FILTER_KEY);
285  const auto& sort_by = foreign_table_->getOption(FILE_SORT_ORDER_BY_KEY);
286  const auto& sort_regex = foreign_table_->getOption(FILE_SORT_REGEX_KEY);
287 
288  auto& server_options = foreign_table_->foreign_server->options;
289  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
290  found_file_paths = shared::local_glob_filter_sort_files(
291  file_path, regex_pattern, sort_by, sort_regex);
292  } else {
293  UNREACHABLE();
294  }
295  return found_file_paths;
296 }
297 
298 void ParquetDataWrapper::metadataScanFiles(const std::vector<std::string>& file_paths) {
300  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
301  auto column_interval =
302  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
303  schema_->getLogicalAndPhysicalColumns().back()->columnId};
304 
305  for (const auto& row_group_metadata_item : row_group_metadata) {
306  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
307  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
308  schema_->numLogicalAndPhysicalColumns());
309  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
310  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
311  int row_group = row_group_metadata_item.row_group_index;
312  const auto& file_path = row_group_metadata_item.file_path;
313  if (moveToNextFragment(import_row_count)) {
314  addNewFragment(row_group, file_path);
315  } else if (isNewFile(file_path)) {
316  CHECK_EQ(row_group, 0);
317  addNewFile(file_path);
318  }
319  last_row_group_ = row_group;
320 
321  for (int column_id = column_interval.start; column_id <= column_interval.end;
322  column_id++, column_chunk_metadata_iter++) {
323  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
324  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
325 
326  const auto& type_info = column_descriptor->columnType;
327  ChunkKey chunk_key{
329  ChunkKey data_chunk_key = chunk_key;
330  if (type_info.is_varlen_indeed()) {
331  data_chunk_key.emplace_back(1);
332  }
333  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
334  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
335  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
336  } else {
337  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
338  }
339  }
340  last_fragment_row_count_ += import_row_count;
341  total_row_count_ += import_row_count;
342  }
344 }
345 
346 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
347  return (last_fragment_row_count_ + new_rows_count) >
348  static_cast<size_t>(foreign_table_->maxFragRows);
349 }
350 
352  ChunkMetadataVector& chunk_metadata_vector) {
354  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
355  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
356  }
357 }
358 
360  const int logical_column_id,
361  const int fragment_id,
362  const ChunkToBufferMap& required_buffers) {
364  CHECK(catalog);
365  const ColumnDescriptor* logical_column =
366  schema_->getColumnDescriptor(logical_column_id);
367  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
368 
369  const Interval<ColumnType> column_interval = {
370  logical_column_id,
371  logical_column_id + logical_column->columnType.get_physical_cols()};
372  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
373 
374  const auto& row_group_intervals =
376 
377  const bool is_dictionary_encoded_string_column =
378  logical_column->columnType.is_dict_encoded_string() ||
379  (logical_column->columnType.is_array() &&
380  logical_column->columnType.get_elem_type().is_dict_encoded_string());
381 
382  StringDictionary* string_dictionary = nullptr;
383  if (is_dictionary_encoded_string_column) {
384  auto dict_descriptor =
385  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
386  CHECK(dict_descriptor);
387  string_dictionary = dict_descriptor->stringDict.get();
388  }
389 
390  std::list<Chunk_NS::Chunk> chunks;
391  for (int column_id = column_interval.start; column_id <= column_interval.end;
392  ++column_id) {
393  auto column_descriptor = schema_->getColumnDescriptor(column_id);
394  Chunk_NS::Chunk chunk{column_descriptor};
395  if (column_descriptor->columnType.is_varlen_indeed()) {
396  ChunkKey data_chunk_key = {
397  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
398  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
399  chunk.setBuffer(buffer);
400  ChunkKey index_chunk_key = {
401  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
402  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
403  chunk.setIndexBuffer(index_buffer);
404  } else {
405  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
406  auto buffer = shared::get_from_map(required_buffers, chunk_key);
407  chunk.setBuffer(buffer);
408  }
409  chunks.emplace_back(chunk);
410  }
411 
413  auto metadata = chunk_loader.loadChunk(
414  row_group_intervals, parquet_column_index, chunks, string_dictionary);
415  auto fragmenter = foreign_table_->fragmenter;
416 
417  auto metadata_iter = metadata.begin();
418  for (int column_id = column_interval.start; column_id <= column_interval.end;
419  ++column_id, ++metadata_iter) {
420  auto column = schema_->getColumnDescriptor(column_id);
421  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
422  if (column->columnType.is_varlen_indeed()) {
423  data_chunk_key.emplace_back(1);
424  }
425  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
426 
427  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
428  // by executor
429  auto cached_metadata_previous =
430  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
431  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
432  std::make_shared<ChunkMetadata>();
433  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
434  *cached_metadata = *cached_metadata_previous;
435 
436  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
437  cached_metadata->numBytes =
438  shared::get_from_map(required_buffers, data_chunk_key)->size();
439 
440  // for certain types, update the metadata statistics
441  // should update the fragmenter, cache, and the internal chunk_metadata_map_
442  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
443  CHECK(metadata_iter != metadata.end());
444  auto& chunk_metadata_ptr = *metadata_iter;
445  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
446  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
447 
448  // Update stats on buffer so it is saved in cache
449  shared::get_from_map(required_buffers, data_chunk_key)
450  ->getEncoder()
451  ->resetChunkStats(cached_metadata->chunkStats);
452  }
453 
454  if (fragmenter) {
455  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
456  }
457  }
458 }
459 
461  const ChunkToBufferMap& optional_buffers) {
462  ChunkToBufferMap buffers_to_load;
463  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
464  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
465 
466  CHECK(!buffers_to_load.empty());
467 
468  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
469  for (const auto& [chunk_key, buffer] : buffers_to_load) {
470  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
471  col_frag_hints.emplace(
472  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
473  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
474  }
475 
476  auto hints_per_thread = partition_for_threads(col_frag_hints, g_max_import_threads);
477 
478  std::vector<std::future<void>> futures;
479  for (const auto& hint_set : hints_per_thread) {
480  futures.emplace_back(std::async(std::launch::async, [&, hint_set, this] {
481  for (const auto& [col_id, frag_id] : hint_set) {
482  loadBuffersUsingLazyParquetChunkLoader(col_id, frag_id, buffers_to_load);
483  }
484  }));
485  }
486 
487  for (auto& future : futures) {
488  future.get();
489  }
490 }
491 
492 void set_value(rapidjson::Value& json_val,
493  const RowGroupInterval& value,
494  rapidjson::Document::AllocatorType& allocator) {
495  json_val.SetObject();
496  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
497  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
498  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
499 }
500 
501 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
502  CHECK(json_val.IsObject());
503  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
504  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
505  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
506 }
507 
509  rapidjson::Document d;
510  d.SetObject();
511 
514  "fragment_to_row_group_interval_map",
515  d.GetAllocator());
516  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
518  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
520  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
522  d, total_row_count_, "total_row_count", d.GetAllocator());
523  return json_utils::write_to_string(d);
524 }
525 
527  const std::string& file_path,
528  const ChunkMetadataVector& chunk_metadata_vector) {
529  auto d = json_utils::read_from_file(file_path);
530  CHECK(d.IsObject());
531 
533  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
535  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
537  d, last_fragment_row_count_, "last_fragment_row_count");
539 
540  CHECK(chunk_metadata_map_.empty());
541  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
542  chunk_metadata_map_[chunk_key] = chunk_metadata;
543  }
544  is_restored_ = true;
545 }
546 
548  return is_restored_;
549 }
550 
551 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:188
std::string getSerializedDataWrapper() const override
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< int > ChunkKey
Definition: types.h:37
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
shared utility for globbing files, paths can be specified as either a single file, directory or wildcards
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:40
std::vector< std::string > getAllFilePaths()
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
void throw_removed_row_error(const std::string &file_path)
int32_t StringOffsetT
Definition: sqltypes.h:1075
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
int get_physical_cols() const
Definition: sqltypes.h:350
future< Result > async(Fn &&fn, Args &&...args)
static SysCatalog & instance()
Definition: SysCatalog.h:325
rapidjson::Document read_from_file(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::optional< std::string > getOption(const std::string_view &key) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
void metadataScanFiles(const std::vector< std::string > &file_paths)
int32_t ArrayOffsetT
Definition: sqltypes.h:1076
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
const ForeignServer * foreign_server
Definition: ForeignTable.h:54
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:521
#define DEBUG_TIMER(name)
Definition: Logger.h:352
std::string write_to_string(const rapidjson::Document &document)
bool is_dict_encoded_string() const
Definition: sqltypes.h:546
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:850
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
size_t g_max_import_threads
Definition: Importer.cpp:85
bool is_array() const
Definition: sqltypes.h:517