OmniSciDB  b28c0d5765
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FixedLengthEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef FIXED_LENGTH_ENCODER_H
18 #define FIXED_LENGTH_ENCODER_H
19 #include "Logger/Logger.h"
20 
21 #include <iostream>
22 #include <memory>
23 #include <stdexcept>
24 #include "AbstractBuffer.h"
25 #include "Encoder.h"
26 
27 #include <Shared/DatumFetchers.h>
28 #include <tbb/parallel_for.h>
29 #include <tbb/parallel_reduce.h>
30 #include <tuple>
31 #include "Shared/Iteration.h"
32 
33 template <typename T, typename V>
34 class FixedLengthEncoder : public Encoder {
35  public:
38  }
39 
40  size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t* index_data,
41  const std::vector<size_t>& selected_idx,
42  const size_t byte_limit) override {
43  UNREACHABLE()
44  << "getNumElemsForBytesEncodedDataAtIndices unexpectedly called for non varlen"
45  " encoder";
46  return {};
47  }
48 
49  std::shared_ptr<ChunkMetadata> appendEncodedDataAtIndices(
50  const int8_t*,
51  int8_t* data,
52  const std::vector<size_t>& selected_idx) override {
53  std::shared_ptr<ChunkMetadata> chunk_metadata;
54  // NOTE: the use of `execute_over_contiguous_indices` is an optimization;
55  // it prevents having to copy or move the indexed data and instead performs
56  // an append over contiguous sections of indices.
58  selected_idx, [&](const size_t start_pos, const size_t end_pos) {
59  size_t elem_count = end_pos - start_pos;
60  chunk_metadata =
61  appendEncodedData(nullptr, data, selected_idx[start_pos], elem_count);
62  });
63  return chunk_metadata;
64  }
65 
66  std::shared_ptr<ChunkMetadata> appendEncodedData(const int8_t*,
67  int8_t* data,
68  const size_t start_idx,
69  const size_t num_elements) override {
70  auto current_data = data + start_idx * sizeof(V);
72  current_data, num_elements, SQLTypeInfo{}, false, -1, true);
73  }
74 
75  std::shared_ptr<ChunkMetadata> appendData(int8_t*& src_data,
76  const size_t num_elems_to_append,
77  const SQLTypeInfo& ti,
78  const bool replicating = false,
79  const int64_t offset = -1) override {
81  src_data, num_elems_to_append, ti, replicating, offset, false);
82  }
83 
84  void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
85  Encoder::getMetadata(chunkMetadata); // call on parent class
86  chunkMetadata->fillChunkStats(dataMin, dataMax, has_nulls);
87  }
88 
89  // Only called from the executor for synthesized meta-information.
90  std::shared_ptr<ChunkMetadata> getMetadata(const SQLTypeInfo& ti) override {
91  auto chunk_metadata = std::make_shared<ChunkMetadata>(ti, 0, 0, ChunkStats{});
92  chunk_metadata->fillChunkStats(dataMin, dataMax, has_nulls);
93  return chunk_metadata;
94  }
95 
96  // Only called from the executor for synthesized meta-information.
97  void updateStats(const int64_t val, const bool is_null) override {
98  if (is_null) {
99  has_nulls = true;
100  } else {
101  const auto data = static_cast<T>(val);
102  dataMin = std::min(dataMin, data);
103  dataMax = std::max(dataMax, data);
104  }
105  }
106 
107  // Only called from the executor for synthesized meta-information.
108  void updateStats(const double val, const bool is_null) override {
109  if (is_null) {
110  has_nulls = true;
111  } else {
112  const auto data = static_cast<T>(val);
113  dataMin = std::min(dataMin, data);
114  dataMax = std::max(dataMax, data);
115  }
116  }
117 
118  void updateStats(const int8_t* const src_data, const size_t num_elements) override {
119  const T* unencoded_data = reinterpret_cast<const T*>(src_data);
120  for (size_t i = 0; i < num_elements; ++i) {
121  encodeDataAndUpdateStats(unencoded_data[i]);
122  }
123  }
124 
125  void updateStatsEncoded(const int8_t* const dst_data,
126  const size_t num_elements) override {
127  const V* data = reinterpret_cast<const V*>(dst_data);
128 
130  tbb::blocked_range(size_t(0), num_elements),
131  std::tuple(static_cast<V>(dataMin), static_cast<V>(dataMax), has_nulls),
132  [&](const auto& range, auto init) {
133  auto [min, max, nulls] = init;
134  for (size_t i = range.begin(); i < range.end(); i++) {
135  if (data[i] != std::numeric_limits<V>::min()) {
137  min = std::min(min, data[i]);
138  max = std::max(max, data[i]);
139  } else {
140  nulls = true;
141  }
142  }
143  return std::tuple(min, max, nulls);
144  },
145  [&](auto lhs, auto rhs) {
146  const auto [lhs_min, lhs_max, lhs_nulls] = lhs;
147  const auto [rhs_min, rhs_max, rhs_nulls] = rhs;
148  return std::tuple(std::min(lhs_min, rhs_min),
149  std::max(lhs_max, rhs_max),
150  lhs_nulls || rhs_nulls);
151  });
152  }
153 
154  void updateStats(const std::vector<std::string>* const src_data,
155  const size_t start_idx,
156  const size_t num_elements) override {
157  UNREACHABLE();
158  }
159 
160  void updateStats(const std::vector<ArrayDatum>* const src_data,
161  const size_t start_idx,
162  const size_t num_elements) override {
163  UNREACHABLE();
164  }
165 
166  // Only called from the executor for synthesized meta-information.
167  void reduceStats(const Encoder& that) override {
168  const auto that_typed = static_cast<const FixedLengthEncoder<T, V>&>(that);
169  if (that_typed.has_nulls) {
170  has_nulls = true;
171  }
172  dataMin = std::min(dataMin, that_typed.dataMin);
173  dataMax = std::max(dataMax, that_typed.dataMax);
174  }
175 
176  void copyMetadata(const Encoder* copyFromEncoder) override {
177  num_elems_ = copyFromEncoder->getNumElems();
178  auto castedEncoder =
179  reinterpret_cast<const FixedLengthEncoder<T, V>*>(copyFromEncoder);
180  dataMin = castedEncoder->dataMin;
181  dataMax = castedEncoder->dataMax;
182  has_nulls = castedEncoder->has_nulls;
183  }
184 
185  void writeMetadata(FILE* f) override {
186  // assumes pointer is already in right place
187  fwrite((int8_t*)&num_elems_, sizeof(size_t), 1, f);
188  fwrite((int8_t*)&dataMin, sizeof(T), 1, f);
189  fwrite((int8_t*)&dataMax, sizeof(T), 1, f);
190  fwrite((int8_t*)&has_nulls, sizeof(bool), 1, f);
191  }
192 
193  void readMetadata(FILE* f) override {
194  // assumes pointer is already in right place
195  fread((int8_t*)&num_elems_, sizeof(size_t), 1, f);
196  fread((int8_t*)&dataMin, 1, sizeof(T), f);
197  fread((int8_t*)&dataMax, 1, sizeof(T), f);
198  fread((int8_t*)&has_nulls, 1, sizeof(bool), f);
199  }
200 
201  bool resetChunkStats(const ChunkStats& stats) override {
202  const auto new_min = DatumFetcher::getDatumVal<T>(stats.min);
203  const auto new_max = DatumFetcher::getDatumVal<T>(stats.max);
204 
205  if (dataMin == new_min && dataMax == new_max && has_nulls == stats.has_nulls) {
206  return false;
207  }
208 
209  dataMin = new_min;
210  dataMax = new_max;
211  has_nulls = stats.has_nulls;
212  return true;
213  }
214 
215  void resetChunkStats() override {
216  dataMin = std::numeric_limits<T>::max();
217  dataMax = std::numeric_limits<T>::lowest();
218  has_nulls = false;
219  }
220 
223  bool has_nulls;
224 
225  private:
226  std::shared_ptr<ChunkMetadata> appendEncodedOrUnencodedData(
227  int8_t*& src_data,
228  const size_t num_elems_to_append,
229  const SQLTypeInfo& ti,
230  const bool replicating,
231  const int64_t offset,
232  const bool is_encoded) {
233  if (offset == 0 &&
234  num_elems_to_append >=
235  num_elems_) { // we're rewriting entire buffer so fully recompute metadata
236  resetChunkStats();
237  }
238 
239  CHECK(!is_encoded || !replicating); // do not support replicating of encoded data
240 
241  T* unencoded_data = reinterpret_cast<T*>(src_data);
242  std::vector<V> encoded_data;
243  V* data_to_write = nullptr;
244  if (!is_encoded) {
245  encoded_data.resize(num_elems_to_append);
246  data_to_write = encoded_data.data();
247  for (size_t i = 0; i < num_elems_to_append; ++i) {
248  size_t ri = replicating ? 0 : i;
249  encoded_data[i] = encodeDataAndUpdateStats(unencoded_data[ri]);
250  }
251  } else {
252  data_to_write = reinterpret_cast<V*>(src_data);
253  for (size_t i = 0; i < num_elems_to_append; ++i) {
254  updateStatsWithAlreadyEncoded(data_to_write[i]);
255  }
256  }
257 
258  // assume always CPU_BUFFER?
259  if (offset == -1) {
260  auto append_data_size = num_elems_to_append * sizeof(V);
261  buffer_->reserve(buffer_->size() + append_data_size);
262  num_elems_ += num_elems_to_append;
263  buffer_->append(reinterpret_cast<int8_t*>(data_to_write), append_data_size);
264  if (!replicating) {
265  src_data += num_elems_to_append * sizeof(T);
266  }
267  } else {
268  num_elems_ = offset + num_elems_to_append;
269  CHECK(!replicating);
270  CHECK_GE(offset, 0);
271  buffer_->write(reinterpret_cast<int8_t*>(data_to_write),
272  num_elems_to_append * sizeof(V),
273  static_cast<size_t>(offset));
274  }
275  auto chunk_metadata = std::make_shared<ChunkMetadata>();
276  getMetadata(chunk_metadata);
277  return chunk_metadata;
278  }
279 
280  void updateStatsWithAlreadyEncoded(const V& encoded_data) {
281  if (encoded_data == std::numeric_limits<V>::min()) {
282  has_nulls = true;
283  } else {
284  dataMin = std::min<T>(dataMin, encoded_data);
285  dataMax = std::max<T>(dataMax, encoded_data);
286  }
287  }
288 
289  V encodeDataAndUpdateStats(const T& unencoded_data) {
290  V encoded_data = static_cast<V>(unencoded_data);
291  if (unencoded_data != encoded_data) {
292  decimal_overflow_validator_.validate(unencoded_data);
293  LOG(ERROR) << "Fixed encoding failed, Unencoded: " +
294  std::to_string(unencoded_data) +
295  " encoded: " + std::to_string(encoded_data);
296  } else {
297  T data = unencoded_data;
298  if (data == std::numeric_limits<V>::min()) {
299  has_nulls = true;
300  } else {
302  dataMin = std::min(dataMin, data);
303  dataMax = std::max(dataMax, data);
304  }
305  }
306  return encoded_data;
307  }
308 }; // FixedLengthEncoder
309 
310 #endif // FIXED_LENGTH_ENCODER_H
void updateStats(const int8_t *const src_data, const size_t num_elements) override
size_t num_elems_
Definition: Encoder.h:288
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
DecimalOverflowValidator decimal_overflow_validator_
Definition: Encoder.h:292
#define LOG(tag)
Definition: Logger.h:216
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *, int8_t *data, const std::vector< size_t > &selected_idx) override
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GE(x, y)
Definition: Logger.h:235
bool has_nulls
Definition: ChunkMetadata.h:30
std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *, int8_t *data, const size_t start_idx, const size_t num_elements) override
void resetChunkStats() override
void updateStats(const int64_t val, const bool is_null) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
std::shared_ptr< ChunkMetadata > appendEncodedOrUnencodedData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating, const int64_t offset, const bool is_encoded)
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:231
void execute_over_contiguous_indices(const std::vector< size_t > &indices, std::function< void(const size_t, const size_t)> to_execute)
Definition: Iteration.h:22
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
Definition: Encoder.h:290
void init(LogOptions const &log_opts)
Definition: Logger.cpp:308
void copyMetadata(const Encoder *copyFromEncoder) override
void readMetadata(FILE *f) override
size_t getNumElems() const
Definition: Encoder.h:284
void updateStats(const double val, const bool is_null) override
V encodeDataAndUpdateStats(const T &unencoded_data)
An AbstractBuffer is a unit of data management for a data manager.
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
void updateStatsEncoded(const int8_t *const dst_data, const size_t num_elements) override
FixedLengthEncoder(Data_Namespace::AbstractBuffer *buffer)
void writeMetadata(FILE *f) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:222
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit) override
void validate(T value) const
Definition: Encoder.h:54
virtual void reserve(size_t num_bytes)=0
void updateStatsWithAlreadyEncoded(const V &encoded_data)
void reduceStats(const Encoder &that) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override