OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetArrayImportEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 
22 #include "ImportExport/Importer.h"
23 #include "ParquetArrayEncoder.h"
25 
26 namespace foreign_storage {
28  public ParquetImportEncoder {
29  public:
31  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
32  const ColumnDescriptor* column_desciptor)
33  : ParquetArrayEncoder(data_buffer, scalar_encoder, column_desciptor)
35  dynamic_cast<TypedParquetStorageBuffer<ArrayDatum>*>(data_buffer))
36  , column_descriptor_(column_desciptor)
39  , invalid_indices_(nullptr) {
41  }
42 
43  void appendArrayItem(const int64_t encoded_index) override {
45  if (!is_valid_item_[encoded_index]) {
46  is_invalid_array_ = true;
47  }
48  }
49 
50  void validateAndAppendData(const int16_t* def_levels,
51  const int16_t* rep_levels,
52  const int64_t values_read,
53  const int64_t levels_read,
54  int8_t* values,
55  const SQLTypeInfo& column_type, /* may not be used */
56  InvalidRowGroupIndices& invalid_indices) override {
57  // validate all elements
58  is_valid_item_.assign(values_read, true);
59  for (int64_t j = 0; j < values_read; ++j) {
60  try {
61  scalar_encoder_->validate(values, j, column_type);
62  } catch (const std::runtime_error& error) {
63  is_valid_item_[j] = false;
64  }
65  }
66  invalid_indices_ = &invalid_indices; // used in assembly algorithm
67  appendData(def_levels, rep_levels, values_read, levels_read, values);
68  }
69 
70  void resetLastArrayMetadata() override {
72  is_invalid_array_ = false;
73  }
74 
75  protected:
76  void appendArraysToBuffer() override {
77  // no-op as data is already written to buffer in `processLastArray`
78  }
79 
80  void processLastArray() override {
82  if (is_invalid_array_) {
85  }
87  }
88 
89  private:
90  ArrayDatum convertToArrayDatum(const int8_t* data, const size_t num_elements) {
91  const size_t num_bytes = num_elements * omnisci_data_type_byte_size_;
92  std::shared_ptr<int8_t> buffer(new int8_t[num_bytes],
93  std::default_delete<int8_t[]>());
94  memcpy(buffer.get(), data, num_bytes);
95  return ArrayDatum(num_bytes, buffer, false);
96  }
97 
100  }
101 
103  if (isLastArrayNull()) {
104  // append a null array offset
106  } else if (isLastArrayEmpty()) {
108  } else {
109  CHECK(data_buffer_bytes_.size() ==
114  .clear(); // can clear immediately, only one array buffered at a time
115  }
116  }
117 
119  const InvalidRowGroupIndices& invalid_indices) override {
120  if (invalid_indices.empty()) {
121  return;
122  }
123  array_datum_buffer_->eraseInvalidData(invalid_indices);
124  }
125 
126  std::vector<bool> is_valid_item_;
132 };
133 } // namespace foreign_storage
void eraseInvalidData(const FindContainer &invalid_indices)
virtual void appendArrayItem(const int64_t encoded_index)
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:395
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
std::set< int64_t > InvalidRowGroupIndices
void appendArrayItem(const int64_t encoded_index) override
ParquetArrayImportEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ArrayDatum convertToArrayDatum(const int8_t *data, const size_t num_elements)
TypedParquetStorageBuffer< ArrayDatum > * array_datum_buffer_
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
SQLTypeInfo columnType
unencoded array encoder
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_