OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ParquetDataWrapper.h"
18 
19 #include <queue>
20 #include <regex>
21 
22 #include <arrow/filesystem/localfs.h>
23 #include <boost/filesystem.hpp>
24 
26 #include "FsiJsonUtils.h"
27 #include "ImportExport/Importer.h"
28 #include "LazyParquetChunkLoader.h"
29 #include "MetadataPlaceholder.h"
30 #include "ParquetShared.h"
31 #include "Utils/DdlUtils.h"
32 
33 namespace foreign_storage {
34 
35 namespace {
36 void reduce_metadata(std::shared_ptr<ChunkMetadata> reduce_to,
37  std::shared_ptr<ChunkMetadata> reduce_from) {
38  CHECK(reduce_to->sqlType == reduce_from->sqlType);
39  reduce_to->numBytes += reduce_from->numBytes;
40  reduce_to->numElements += reduce_from->numElements;
41  reduce_to->chunkStats.has_nulls |= reduce_from->chunkStats.has_nulls;
42 
43  auto column_type = reduce_to->sqlType;
44  column_type = column_type.is_array() ? column_type.get_elem_type() : column_type;
45 
46  // metadata reducution is done at metadata scan time, both string & geometry
47  // columns have no valid stats to reduce beyond `has_nulls`
48  if (column_type.is_string() || column_type.is_geometry()) {
49  // Reset to invalid range, as formerly valid metadata
50  // needs to be invalidated during an append for these types
51  reduce_to->chunkStats.max = reduce_from->chunkStats.max;
52  reduce_to->chunkStats.min = reduce_from->chunkStats.min;
53  return;
54  }
55 
56  ForeignStorageBuffer buffer_to;
57  buffer_to.initEncoder(column_type);
58  auto encoder_to = buffer_to.getEncoder();
59  encoder_to->resetChunkStats(reduce_to->chunkStats);
60 
61  ForeignStorageBuffer buffer_from;
62  buffer_from.initEncoder(column_type);
63  auto encoder_from = buffer_from.getEncoder();
64  encoder_from->resetChunkStats(reduce_from->chunkStats);
65 
66  encoder_to->reduceStats(*encoder_from);
67  auto updated_metadata = std::make_shared<ChunkMetadata>();
68  encoder_to->getMetadata(updated_metadata);
69  reduce_to->chunkStats = updated_metadata->chunkStats;
70 }
71 } // namespace
72 
73 ParquetDataWrapper::ParquetDataWrapper() : db_id_(-1), foreign_table_(nullptr) {}
74 
75 ParquetDataWrapper::ParquetDataWrapper(const int db_id, const ForeignTable* foreign_table)
76  : db_id_(db_id)
77  , foreign_table_(foreign_table)
78  , last_fragment_index_(0)
79  , last_fragment_row_count_(0)
80  , total_row_count_(0)
81  , last_row_group_(0)
82  , is_restored_(false)
83  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
84  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
85  auto& server_options = foreign_table->foreign_server->options;
86  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
87  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
88  } else {
89  UNREACHABLE();
90  }
91 }
92 
96 
97  last_row_group_ = 0;
100  total_row_count_ = 0;
101  file_reader_cache_->clear();
102 }
103 
104 std::list<const ColumnDescriptor*> ParquetDataWrapper::getColumnsToInitialize(
105  const Interval<ColumnType>& column_interval) {
107  CHECK(catalog);
108  const auto& columns = schema_->getLogicalAndPhysicalColumns();
109  auto column_start = column_interval.start;
110  auto column_end = column_interval.end;
111  std::list<const ColumnDescriptor*> columns_to_init;
112  for (const auto column : columns) {
113  auto column_id = column->columnId;
114  if (column_id >= column_start && column_id <= column_end) {
115  columns_to_init.push_back(column);
116  }
117  }
118  return columns_to_init;
119 }
120 
122  const int fragment_index,
123  const Interval<ColumnType>& column_interval,
124  const ChunkToBufferMap& required_buffers,
125  const bool reserve_buffers_and_set_stats) {
126  for (const auto column : getColumnsToInitialize(column_interval)) {
127  Chunk_NS::Chunk chunk{column};
128  ChunkKey data_chunk_key;
129  if (column->columnType.is_varlen_indeed()) {
130  data_chunk_key = {
131  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
132  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
133  auto data_buffer = required_buffers.at(data_chunk_key);
134  chunk.setBuffer(data_buffer);
135 
136  ChunkKey index_chunk_key{
137  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
138  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
139  auto index_buffer = required_buffers.at(index_chunk_key);
140  chunk.setIndexBuffer(index_buffer);
141  } else {
142  data_chunk_key = {
143  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
144  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
145  auto data_buffer = required_buffers.at(data_chunk_key);
146  chunk.setBuffer(data_buffer);
147  }
148  chunk.initEncoder();
149  if (reserve_buffers_and_set_stats) {
150  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
151  CHECK(metadata_it != chunk_metadata_map_.end());
152  auto buffer = chunk.getBuffer();
153  auto& metadata = metadata_it->second;
154  auto encoder = buffer->getEncoder();
155  encoder->resetChunkStats(metadata->chunkStats);
156  encoder->setNumElems(metadata->numElements);
157  if (column->columnType.is_string() &&
158  column->columnType.get_compression() == kENCODING_NONE) {
159  auto index_buffer = chunk.getIndexBuf();
160  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
161  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
162  auto index_buffer = chunk.getIndexBuf();
163  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
164  } else {
165  size_t num_bytes_to_reserve =
166  metadata->numElements * column->columnType.get_size();
167  buffer->reserve(num_bytes_to_reserve);
168  }
169  }
170  }
171 }
172 
176 }
177 
178 void ParquetDataWrapper::addNewFragment(int row_group, const std::string& file_path) {
179  const auto last_fragment_entry =
181  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
182 
183  last_fragment_entry->second.back().end_index = last_row_group_;
187  RowGroupInterval{file_path, row_group});
188 }
189 
190 bool ParquetDataWrapper::isNewFile(const std::string& file_path) const {
191  const auto last_fragment_entry =
193  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
194 
195  // The entry for the first fragment starts out as an empty vector
196  if (last_fragment_entry->second.empty()) {
198  return true;
199  } else {
200  return (last_fragment_entry->second.back().file_path != file_path);
201  }
202 }
203 
204 void ParquetDataWrapper::addNewFile(const std::string& file_path) {
205  const auto last_fragment_entry =
207  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
208 
209  // The entry for the first fragment starts out as an empty vector
210  if (last_fragment_entry->second.empty()) {
212  } else {
213  last_fragment_entry->second.back().end_index = last_row_group_;
214  }
215  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
216 }
217 
220  CHECK(catalog);
221  std::set<std::string> new_file_paths;
222  auto processed_file_paths = getProcessedFilePaths();
223  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
224  auto all_file_paths = getAllFilePaths();
225  for (const auto& file_path : processed_file_paths) {
226  if (all_file_paths.find(file_path) == all_file_paths.end()) {
227  throw_removed_file_error(file_path);
228  }
229  }
230 
231  for (const auto& file_path : all_file_paths) {
232  if (processed_file_paths.find(file_path) == processed_file_paths.end()) {
233  new_file_paths.emplace(file_path);
234  }
235  }
236 
237  // Single file append
238  // If an append occurs with multiple files, then we assume any existing files have not
239  // been altered. If an append occurs on a single file, then we check to see if it has
240  // changed.
241  if (new_file_paths.empty() && all_file_paths.size() == 1) {
242  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
243  const auto& file_path = *all_file_paths.begin();
244  CHECK_EQ(*processed_file_paths.begin(), file_path);
245 
246  // Since an existing file is being appended to we need to update the cached
247  // FileReader as the existing one will be out of date.
248  auto reader = file_reader_cache_->insert(file_path, file_system_);
249  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
250 
251  if (row_count < total_row_count_) {
252  throw_removed_row_error(file_path);
253  } else if (row_count > total_row_count_) {
254  new_file_paths = all_file_paths;
255  chunk_metadata_map_.clear();
257  }
258  }
259  } else {
260  CHECK(chunk_metadata_map_.empty());
261  new_file_paths = getAllFilePaths();
263  }
264 
265  if (!new_file_paths.empty()) {
266  metadataScanFiles(new_file_paths);
267  }
268 }
269 
271  std::set<std::string> file_paths;
272  for (const auto& entry : fragment_to_row_group_interval_map_) {
273  for (const auto& row_group_interval : entry.second) {
274  file_paths.emplace(row_group_interval.file_path);
275  }
276  }
277  return file_paths;
278 }
279 
280 std::set<std::string> ParquetDataWrapper::getAllFilePaths() {
281  auto timer = DEBUG_TIMER(__func__);
282  std::set<std::string> file_paths;
283  arrow::fs::FileSelector file_selector{};
284  std::string base_path = getFullFilePath(foreign_table_);
285  file_selector.base_dir = base_path;
286  file_selector.recursive = true;
287 
288  auto file_info_result = file_system_->GetFileInfo(file_selector);
289  if (!file_info_result.ok()) {
290  // This is expected when `base_path` points to a single file.
291  file_paths.emplace(base_path);
292  } else {
293  auto& file_info_vector = file_info_result.ValueOrDie();
294  for (const auto& file_info : file_info_vector) {
295  if (file_info.type() == arrow::fs::FileType::File) {
296  file_paths.emplace(file_info.path());
297  }
298  }
299  if (file_paths.empty()) {
300  throw std::runtime_error{"No file found at given path \"" + base_path + "\"."};
301  }
302  }
303  return file_paths;
304 }
305 
306 void ParquetDataWrapper::metadataScanFiles(const std::set<std::string>& file_paths) {
308  auto row_group_metadata = chunk_loader.metadataScan(file_paths, *schema_);
309  auto column_interval =
310  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
311  schema_->getLogicalAndPhysicalColumns().back()->columnId};
312 
313  for (const auto& row_group_metadata_item : row_group_metadata) {
314  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
315  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
316  schema_->numLogicalAndPhysicalColumns());
317  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
318  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
319  int row_group = row_group_metadata_item.row_group_index;
320  const auto& file_path = row_group_metadata_item.file_path;
321  if (moveToNextFragment(import_row_count)) {
322  addNewFragment(row_group, file_path);
323  } else if (isNewFile(file_path)) {
324  CHECK_EQ(row_group, 0);
325  addNewFile(file_path);
326  }
327  last_row_group_ = row_group;
328 
329  for (int column_id = column_interval.start; column_id <= column_interval.end;
330  column_id++, column_chunk_metadata_iter++) {
331  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
332  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
333 
334  const auto& type_info = column_descriptor->columnType;
335  ChunkKey chunk_key{
337  ChunkKey data_chunk_key = chunk_key;
338  if (type_info.is_varlen_indeed()) {
339  data_chunk_key.emplace_back(1);
340  }
341  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
342  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
343  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
344  } else {
345  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
346  }
347  }
348  last_fragment_row_count_ += import_row_count;
349  total_row_count_ += import_row_count;
350  }
352 }
353 
354 bool ParquetDataWrapper::moveToNextFragment(size_t new_rows_count) const {
355  return (last_fragment_row_count_ + new_rows_count) >
356  static_cast<size_t>(foreign_table_->maxFragRows);
357 }
358 
360  ChunkMetadataVector& chunk_metadata_vector) {
362  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
363  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
364  }
365 }
366 
368  const int logical_column_id,
369  const int fragment_id,
370  const ChunkToBufferMap& required_buffers) {
372  CHECK(catalog);
373  const ColumnDescriptor* logical_column =
374  schema_->getColumnDescriptor(logical_column_id);
375  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
376 
377  const Interval<ColumnType> column_interval = {
378  logical_column_id,
379  logical_column_id + logical_column->columnType.get_physical_cols()};
380  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
381 
382  const auto& row_group_intervals = fragment_to_row_group_interval_map_.at(fragment_id);
383 
384  const bool is_dictionary_encoded_string_column =
385  logical_column->columnType.is_dict_encoded_string() ||
386  (logical_column->columnType.is_array() &&
387  logical_column->columnType.get_elem_type().is_dict_encoded_string());
388 
389  StringDictionary* string_dictionary = nullptr;
390  if (is_dictionary_encoded_string_column) {
391  auto dict_descriptor = catalog->getMetadataForDictUnlocked(
392  logical_column->columnType.get_comp_param(), true);
393  CHECK(dict_descriptor);
394  string_dictionary = dict_descriptor->stringDict.get();
395  }
396 
397  std::list<Chunk_NS::Chunk> chunks;
398  for (int column_id = column_interval.start; column_id <= column_interval.end;
399  ++column_id) {
400  auto column_descriptor = schema_->getColumnDescriptor(column_id);
401  Chunk_NS::Chunk chunk{column_descriptor};
402  if (column_descriptor->columnType.is_varlen_indeed()) {
403  ChunkKey data_chunk_key = {
404  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
405  auto buffer = required_buffers.at(data_chunk_key);
406  chunk.setBuffer(buffer);
407  ChunkKey index_chunk_key = {
408  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
409  auto index_buffer = required_buffers.at(index_chunk_key);
410  chunk.setIndexBuffer(index_buffer);
411  } else {
412  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
413  auto buffer = required_buffers.at(chunk_key);
414  chunk.setBuffer(buffer);
415  }
416  chunks.emplace_back(chunk);
417  }
418 
420  auto metadata = chunk_loader.loadChunk(
421  row_group_intervals, parquet_column_index, chunks, string_dictionary);
422  auto fragmenter = foreign_table_->fragmenter;
423 
424  auto metadata_iter = metadata.begin();
425  for (int column_id = column_interval.start; column_id <= column_interval.end;
426  ++column_id, ++metadata_iter) {
427  auto column = schema_->getColumnDescriptor(column_id);
428  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
429  if (column->columnType.is_varlen_indeed()) {
430  data_chunk_key.emplace_back(1);
431  }
432  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
433 
434  // Allocate new shared_ptr for metadata so we dont modify old one which may be used by
435  // executor
436  auto cached_metadata_previous = chunk_metadata_map_.at(data_chunk_key);
437  chunk_metadata_map_.at(data_chunk_key) = std::make_shared<ChunkMetadata>();
438  auto cached_metadata = chunk_metadata_map_.at(data_chunk_key);
439  *cached_metadata = *cached_metadata_previous;
440 
441  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
442  cached_metadata->numBytes = required_buffers.at(data_chunk_key)->size();
443 
444  // for certain types, update the metadata statistics
445  // should update the fragmenter, cache, and the internal chunk_metadata_map_
446  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
447  CHECK(metadata_iter != metadata.end());
448  auto& chunk_metadata_ptr = *metadata_iter;
449  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
450  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
451 
452  // Update stats on buffer so it is saved in cache
453  required_buffers.at(data_chunk_key)
454  ->getEncoder()
455  ->resetChunkStats(cached_metadata->chunkStats);
456  }
457 
458  if (fragmenter) {
459  fragmenter->updateColumnChunkMetadata(column, fragment_id, cached_metadata);
460  }
461  }
462 }
463 
465  const ChunkToBufferMap& optional_buffers) {
466  ChunkToBufferMap buffers_to_load;
467  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
468  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
469 
470  CHECK(!buffers_to_load.empty());
471 
472  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
473  for (const auto& [chunk_key, buffer] : buffers_to_load) {
474  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
475  col_frag_hints.emplace(
476  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
477  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
478  }
479 
480  auto hints_per_thread = partition_for_threads(col_frag_hints, g_max_import_threads);
481 
482  std::vector<std::future<void>> futures;
483  for (const auto& hint_set : hints_per_thread) {
484  futures.emplace_back(std::async(std::launch::async, [&, hint_set, this] {
485  for (const auto& [col_id, frag_id] : hint_set) {
486  loadBuffersUsingLazyParquetChunkLoader(col_id, frag_id, buffers_to_load);
487  }
488  }));
489  }
490 
491  for (auto& future : futures) {
492  future.get();
493  }
494 }
495 
496 void set_value(rapidjson::Value& json_val,
497  const RowGroupInterval& value,
498  rapidjson::Document::AllocatorType& allocator) {
499  json_val.SetObject();
500  json_utils::add_value_to_object(json_val, value.file_path, "file_path", allocator);
501  json_utils::add_value_to_object(json_val, value.start_index, "start_index", allocator);
502  json_utils::add_value_to_object(json_val, value.end_index, "end_index", allocator);
503 }
504 
505 void get_value(const rapidjson::Value& json_val, RowGroupInterval& value) {
506  CHECK(json_val.IsObject());
507  json_utils::get_value_from_object(json_val, value.file_path, "file_path");
508  json_utils::get_value_from_object(json_val, value.start_index, "start_index");
509  json_utils::get_value_from_object(json_val, value.end_index, "end_index");
510 }
511 
513  const std::string& file_path) const {
514  rapidjson::Document d;
515  d.SetObject();
516 
519  "fragment_to_row_group_interval_map",
520  d.GetAllocator());
521  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
523  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
525  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
527  d, total_row_count_, "total_row_count", d.GetAllocator());
528 
529  json_utils::write_to_file(d, file_path);
530 }
531 
533  const std::string& file_path,
534  const ChunkMetadataVector& chunk_metadata_vector) {
535  auto d = json_utils::read_from_file(file_path);
536  CHECK(d.IsObject());
537 
539  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
543  d, last_fragment_row_count_, "last_fragment_row_count");
545 
546  CHECK(chunk_metadata_map_.empty());
547  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
548  chunk_metadata_map_[chunk_key] = chunk_metadata;
549  }
550  is_restored_ = true;
551 }
552 
554  return is_restored_;
555 }
556 
557 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::set< std::string > getAllFilePaths()
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
tuple d
Definition: test_fsi.py:9
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::unique_ptr< ForeignTableSchema > schema_
void serializeDataWrapperInternals(const std::string &file_path) const override
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define UNREACHABLE()
Definition: Logger.h:247
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
void throw_removed_row_error(const std::string &file_path)
int32_t StringOffsetT
Definition: sqltypes.h:937
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
int get_physical_cols() const
Definition: sqltypes.h:335
static SysCatalog & instance()
Definition: SysCatalog.h:292
rapidjson::Document read_from_file(const std::string &file_path)
void metadataScanFiles(const std::set< std::string > &file_paths)
void addNewFragment(int row_group, const std::string &file_path)
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
specifies the content in-memory of a row in the column metadata table
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void write_to_file(const rapidjson::Document &document, const std::string &filepath)
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
int32_t ArrayOffsetT
Definition: sqltypes.h:938
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:203
bool is_geometry() const
Definition: sqltypes.h:501
#define DEBUG_TIMER(name)
Definition: Logger.h:319
bool is_dict_encoded_string() const
Definition: sqltypes.h:526
SQLTypeInfo columnType
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
size_t g_max_import_threads
Definition: Importer.cpp:84
bool is_array() const
Definition: sqltypes.h:497
std::list< RowGroupMetadata > metadataScan(const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.