OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FixedLengthArrayNoneEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #ifndef FIXED_LENGTH_ARRAY_NONE_ENCODER_H
24 #define FIXED_LENGTH_ARRAY_NONE_ENCODER_H
25 
26 #include "Logger/Logger.h"
27 
28 #include <cassert>
29 #include <cstring>
30 #include <memory>
31 #include <mutex>
32 #include <string>
33 #include <vector>
34 #include "AbstractBuffer.h"
35 #include "ChunkMetadata.h"
36 #include "Encoder.h"
37 
39 
41  public:
43  : Encoder(buffer), has_nulls(false), initialized(false), array_size(as) {}
44 
45  size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t* index_data,
46  const std::vector<size_t>& selected_idx,
47  const size_t byte_limit) override {
48  size_t data_size = selected_idx.size() * array_size;
49  if (data_size > byte_limit) {
50  data_size = byte_limit;
51  }
52  return data_size / array_size;
53  }
54 
55  size_t getNumElemsForBytesInsertData(const std::vector<ArrayDatum>* srcData,
56  const int start_idx,
57  const size_t numAppendElems,
58  const size_t byteLimit,
59  const bool replicating = false) {
60  size_t dataSize = numAppendElems * array_size;
61  if (dataSize > byteLimit) {
62  dataSize = byteLimit;
63  }
64  return dataSize / array_size;
65  }
66 
67  std::shared_ptr<ChunkMetadata> appendEncodedDataAtIndices(
68  const int8_t* index_data,
69  int8_t* data,
70  const std::vector<size_t>& selected_idx) override {
71  std::vector<ArrayDatum> data_subset;
72  data_subset.reserve(selected_idx.size());
73  for (const auto& index : selected_idx) {
74  auto current_data = data + array_size * (index);
75  data_subset.emplace_back(ArrayDatum(array_size,
76  current_data,
77  is_null_ignore_not_null(current_data),
78  DoNothingDeleter{}));
79  }
80  return appendData(&data_subset, 0, selected_idx.size(), false);
81  }
82 
83  std::shared_ptr<ChunkMetadata> appendEncodedData(const int8_t* index_data,
84  int8_t* data,
85  const size_t start_idx,
86  const size_t num_elements) override {
87  std::vector<ArrayDatum> data_subset;
88  data_subset.reserve(num_elements);
89  for (size_t count = 0; count < num_elements; ++count) {
90  auto current_data = data + array_size * (start_idx + count);
91  data_subset.emplace_back(
92  ArrayDatum(array_size, current_data, false, DoNothingDeleter{}));
93  }
94  return appendData(&data_subset, 0, num_elements, false);
95  }
96 
97  std::shared_ptr<ChunkMetadata> appendData(int8_t*& src_data,
98  const size_t num_elems_to_append,
99  const SQLTypeInfo& ti,
100  const bool replicating = false,
101  const int64_t offset = -1) override {
102  UNREACHABLE(); // should never be called for arrays
103  return nullptr;
104  }
105 
106  std::shared_ptr<ChunkMetadata> appendData(const std::vector<ArrayDatum>* srcData,
107  const int start_idx,
108  const size_t numAppendElems,
109  const bool replicating = false) {
110  const size_t existing_data_size = num_elems_ * array_size;
111  const size_t append_data_size = array_size * numAppendElems;
112  buffer_->reserve(existing_data_size + append_data_size);
113  std::vector<int8_t> append_buffer(append_data_size);
114  int8_t* append_ptr = append_buffer.data();
115 
116  // There was some worry about the change implemented to write the append data to an
117  // intermediate buffer, but testing on import and ctas of 20M points, we never append
118  // more than 1.6MB and 1MB of data at a time, respectively, so at least for fixed
119  // length types this should not be an issue (varlen types, which can be massive even
120  // for a single field/row, are a different story however)
121 
122  if (replicating) {
123  const size_t len = (*srcData)[0].length;
124  CHECK_EQ(len, array_size);
125  const int8_t* replicated_ptr = (*srcData)[0].pointer;
126  for (size_t i = 0; i < numAppendElems; ++i) {
127  std::memcpy(append_ptr + i * array_size, replicated_ptr, array_size);
128  }
129  } else {
130  for (size_t i = 0; i < numAppendElems; ++i) {
131  // Length of the appended array should be equal to the fixed length,
132  // all others should have been discarded, assert if something slips through
133  const size_t source_idx = start_idx + i;
134  const size_t len = (*srcData)[source_idx].length;
135  CHECK_EQ(len, array_size);
136  // NULL arrays have been filled with subtype's NULL sentinels,
137  // should be appended as regular data, same size
138  std::memcpy(
139  append_ptr + i * array_size, (*srcData)[source_idx].pointer, array_size);
140  }
141  }
142 
143  buffer_->append(append_ptr, append_data_size);
144 
145  if (replicating) {
146  updateStats(srcData, 0, 1);
147  } else {
148  updateStats(srcData, start_idx, numAppendElems);
149  }
150 
151  // make sure buffer_ is flushed even if no new data is appended to it
152  // (e.g. empty strings) because the metadata needs to be flushed.
153  if (!buffer_->isDirty()) {
154  buffer_->setDirty();
155  }
156 
157  num_elems_ += numAppendElems;
158  auto chunk_metadata = std::make_shared<ChunkMetadata>();
159  getMetadata(chunk_metadata);
160  return chunk_metadata;
161  }
162 
163  void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
164  Encoder::getMetadata(chunkMetadata); // call on parent class
165  chunkMetadata->fillChunkStats(elem_min, elem_max, has_nulls);
166  }
167 
168  // Only called from the executor for synthesized meta-information.
169  std::shared_ptr<ChunkMetadata> getMetadata(const SQLTypeInfo& ti) override {
170  auto chunk_metadata = std::make_shared<ChunkMetadata>(
171  ti, 0, 0, ChunkStats{elem_min, elem_max, has_nulls});
172  return chunk_metadata;
173  }
174 
175  void updateStats(const int64_t, const bool) override { CHECK(false); }
176 
177  void updateStats(const double, const bool) override { CHECK(false); }
178 
179  void reduceStats(const Encoder&) override { CHECK(false); }
180 
181  void updateStats(const int8_t* const src_data, const size_t num_elements) override {
182  UNREACHABLE();
183  }
184 
185  void updateStats(const std::vector<std::string>* const src_data,
186  const size_t start_idx,
187  const size_t num_elements) override {
188  UNREACHABLE();
189  }
190 
191  void updateStats(const std::vector<ArrayDatum>* const src_data,
192  const size_t start_idx,
193  const size_t num_elements) override {
194  for (size_t n = start_idx; n < start_idx + num_elements; n++) {
195  update_elem_stats((*src_data)[n]);
196  }
197  }
198 
199  void writeMetadata(FILE* f) override {
200  // assumes pointer is already in right place
201  fwrite((int8_t*)&num_elems_, sizeof(size_t), 1, f);
202  fwrite((int8_t*)&elem_min, sizeof(Datum), 1, f);
203  fwrite((int8_t*)&elem_max, sizeof(Datum), 1, f);
204  fwrite((int8_t*)&has_nulls, sizeof(bool), 1, f);
205  fwrite((int8_t*)&initialized, sizeof(bool), 1, f);
206  }
207 
208  void readMetadata(FILE* f) override {
209  // assumes pointer is already in right place
210  fread((int8_t*)&num_elems_, sizeof(size_t), 1, f);
211  fread((int8_t*)&elem_min, sizeof(Datum), 1, f);
212  fread((int8_t*)&elem_max, sizeof(Datum), 1, f);
213  fread((int8_t*)&has_nulls, sizeof(bool), 1, f);
214  fread((int8_t*)&initialized, sizeof(bool), 1, f);
215  }
216 
217  void copyMetadata(const Encoder* copyFromEncoder) override {
218  num_elems_ = copyFromEncoder->getNumElems();
219  auto array_encoder =
220  dynamic_cast<const FixedLengthArrayNoneEncoder*>(copyFromEncoder);
221  elem_min = array_encoder->elem_min;
222  elem_max = array_encoder->elem_max;
223  has_nulls = array_encoder->has_nulls;
224  initialized = array_encoder->initialized;
225  }
226 
227  void updateMetadata(int8_t* array) {
229  }
230 
231  static bool is_null_ignore_not_null(const SQLTypeInfo& type, int8_t* array) {
232  switch (type.get_subtype()) {
233  case kBOOLEAN: {
234  return (array[0] == NULL_ARRAY_BOOLEAN);
235  }
236  case kINT: {
237  const int32_t* int_array = (int32_t*)array;
238  return (int_array[0] == NULL_ARRAY_INT);
239  }
240  case kSMALLINT: {
241  const int16_t* smallint_array = (int16_t*)array;
242  return (smallint_array[0] == NULL_ARRAY_SMALLINT);
243  }
244  case kTINYINT: {
245  const int8_t* tinyint_array = (int8_t*)array;
246  return (tinyint_array[0] == NULL_ARRAY_TINYINT);
247  }
248  case kBIGINT:
249  case kNUMERIC:
250  case kDECIMAL: {
251  const int64_t* bigint_array = (int64_t*)array;
252  return (bigint_array[0] == NULL_ARRAY_BIGINT);
253  }
254  case kFLOAT: {
255  const float* flt_array = (float*)array;
256  return (flt_array[0] == NULL_ARRAY_FLOAT);
257  }
258  case kDOUBLE: {
259  const double* dbl_array = (double*)array;
260  return (dbl_array[0] == NULL_ARRAY_DOUBLE);
261  }
262  case kTIME:
263  case kTIMESTAMP:
264  case kDATE: {
265  const int64_t* tm_array = reinterpret_cast<int64_t*>(array);
266  return (tm_array[0] == NULL_ARRAY_BIGINT);
267  }
268  case kCHAR:
269  case kVARCHAR:
270  case kTEXT: {
272  const int32_t* int_array = (int32_t*)array;
273  return (int_array[0] == NULL_ARRAY_INT);
274  }
275  default:
276  UNREACHABLE();
277  }
278  return false;
279  }
280 
281  static bool is_null(const SQLTypeInfo& type, int8_t* array) {
282  if (type.get_notnull()) {
283  return false;
284  }
285  return is_null_ignore_not_null(type, array);
286  }
287 
288  bool resetChunkStats(const ChunkStats& stats) override {
289  auto elem_type = buffer_->getSqlType().get_elem_type();
290  if (initialized && DatumEqual(elem_min, stats.min, elem_type) &&
291  DatumEqual(elem_max, stats.max, elem_type) && has_nulls == stats.has_nulls) {
292  return false;
293  }
294  elem_min = stats.min;
295  elem_max = stats.max;
296  has_nulls = stats.has_nulls;
297  return true;
298  }
299 
300  void resetChunkStats() override {
301  has_nulls = false;
302  initialized = false;
303  }
304 
307  bool has_nulls;
309 
310  private:
311  std::mutex EncoderMutex_;
312  std::mutex print_mutex_;
313  size_t array_size;
314 
315  bool is_null(int8_t* array) { return is_null(buffer_->getSqlType(), array); }
316 
317  bool is_null_ignore_not_null(int8_t* array) {
318  return is_null_ignore_not_null(buffer_->getSqlType(), array);
319  }
320 
321  void update_elem_stats(const ArrayDatum& array) {
322  if (array.is_null) {
323  has_nulls = true;
324  }
325  switch (buffer_->getSqlType().get_subtype()) {
326  case kBOOLEAN: {
327  if (!initialized) {
328  elem_min.boolval = true;
329  elem_max.boolval = false;
330  }
331  if (array.is_null) {
332  break;
333  }
334  const int8_t* bool_array = array.pointer;
335  for (size_t i = 0; i < array.length / sizeof(bool); i++) {
336  if (bool_array[i] == NULL_BOOLEAN) {
337  has_nulls = true;
338  } else if (initialized) {
339  elem_min.boolval = std::min(elem_min.boolval, bool_array[i]);
340  elem_max.boolval = std::max(elem_max.boolval, bool_array[i]);
341  } else {
342  elem_min.boolval = bool_array[i];
343  elem_max.boolval = bool_array[i];
344  initialized = true;
345  }
346  }
347  break;
348  }
349  case kINT: {
350  if (!initialized) {
351  elem_min.intval = 1;
352  elem_max.intval = 0;
353  }
354  if (array.is_null) {
355  break;
356  }
357  const int32_t* int_array = (int32_t*)array.pointer;
358  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
359  if (int_array[i] == NULL_INT) {
360  has_nulls = true;
361  } else if (initialized) {
362  elem_min.intval = std::min(elem_min.intval, int_array[i]);
363  elem_max.intval = std::max(elem_max.intval, int_array[i]);
364  } else {
365  elem_min.intval = int_array[i];
366  elem_max.intval = int_array[i];
367  initialized = true;
368  }
369  }
370  break;
371  }
372  case kSMALLINT: {
373  if (!initialized) {
374  elem_min.smallintval = 1;
375  elem_max.smallintval = 0;
376  }
377  if (array.is_null) {
378  break;
379  }
380  const int16_t* smallint_array = (int16_t*)array.pointer;
381  for (size_t i = 0; i < array.length / sizeof(int16_t); i++) {
382  if (smallint_array[i] == NULL_SMALLINT) {
383  has_nulls = true;
384  } else if (initialized) {
385  elem_min.smallintval = std::min(elem_min.smallintval, smallint_array[i]);
386  elem_max.smallintval = std::max(elem_max.smallintval, smallint_array[i]);
387  } else {
388  elem_min.smallintval = smallint_array[i];
389  elem_max.smallintval = smallint_array[i];
390  initialized = true;
391  }
392  }
393  break;
394  }
395  case kTINYINT: {
396  if (!initialized) {
397  elem_min.tinyintval = 1;
398  elem_max.tinyintval = 0;
399  }
400  if (array.is_null) {
401  break;
402  }
403  const int8_t* tinyint_array = (int8_t*)array.pointer;
404  for (size_t i = 0; i < array.length / sizeof(int8_t); i++) {
405  if (tinyint_array[i] == NULL_TINYINT) {
406  has_nulls = true;
407  } else if (initialized) {
408  elem_min.tinyintval = std::min(elem_min.tinyintval, tinyint_array[i]);
409  elem_max.tinyintval = std::max(elem_max.tinyintval, tinyint_array[i]);
410  } else {
411  elem_min.tinyintval = tinyint_array[i];
412  elem_max.tinyintval = tinyint_array[i];
413  initialized = true;
414  }
415  }
416  break;
417  }
418  case kBIGINT:
419  case kNUMERIC:
420  case kDECIMAL: {
421  if (!initialized) {
422  elem_min.bigintval = 1;
423  elem_max.bigintval = 0;
424  }
425  if (array.is_null) {
426  break;
427  }
428  const int64_t* bigint_array = (int64_t*)array.pointer;
429  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
430  if (bigint_array[i] == NULL_BIGINT) {
431  has_nulls = true;
432  } else if (initialized) {
433  decimal_overflow_validator_.validate(bigint_array[i]);
434  elem_min.bigintval = std::min(elem_min.bigintval, bigint_array[i]);
435  elem_max.bigintval = std::max(elem_max.bigintval, bigint_array[i]);
436  } else {
437  decimal_overflow_validator_.validate(bigint_array[i]);
438  elem_min.bigintval = bigint_array[i];
439  elem_max.bigintval = bigint_array[i];
440  initialized = true;
441  }
442  }
443  break;
444  }
445  case kFLOAT: {
446  if (!initialized) {
447  elem_min.floatval = 1.0;
448  elem_max.floatval = 0.0;
449  }
450  if (array.is_null) {
451  break;
452  }
453  const float* flt_array = (float*)array.pointer;
454  for (size_t i = 0; i < array.length / sizeof(float); i++) {
455  if (flt_array[i] == NULL_FLOAT) {
456  has_nulls = true;
457  } else if (initialized) {
458  elem_min.floatval = std::min(elem_min.floatval, flt_array[i]);
459  elem_max.floatval = std::max(elem_max.floatval, flt_array[i]);
460  } else {
461  elem_min.floatval = flt_array[i];
462  elem_max.floatval = flt_array[i];
463  initialized = true;
464  }
465  }
466  break;
467  }
468  case kDOUBLE: {
469  if (!initialized) {
470  elem_min.doubleval = 1.0;
471  elem_max.doubleval = 0.0;
472  }
473  if (array.is_null) {
474  break;
475  }
476  const double* dbl_array = (double*)array.pointer;
477  for (size_t i = 0; i < array.length / sizeof(double); i++) {
478  if (dbl_array[i] == NULL_DOUBLE) {
479  has_nulls = true;
480  } else if (initialized) {
481  elem_min.doubleval = std::min(elem_min.doubleval, dbl_array[i]);
482  elem_max.doubleval = std::max(elem_max.doubleval, dbl_array[i]);
483  } else {
484  elem_min.doubleval = dbl_array[i];
485  elem_max.doubleval = dbl_array[i];
486  initialized = true;
487  }
488  }
489  break;
490  }
491  case kTIME:
492  case kTIMESTAMP:
493  case kDATE: {
494  if (!initialized) {
495  elem_min.bigintval = 1;
496  elem_max.bigintval = 0;
497  }
498  if (array.is_null) {
499  break;
500  }
501  const int64_t* tm_array = reinterpret_cast<int64_t*>(array.pointer);
502  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
503  if (tm_array[i] == NULL_BIGINT) {
504  has_nulls = true;
505  } else if (initialized) {
506  elem_min.bigintval = std::min(elem_min.bigintval, tm_array[i]);
507  elem_max.bigintval = std::max(elem_max.bigintval, tm_array[i]);
508  } else {
509  elem_min.bigintval = tm_array[i];
510  elem_max.bigintval = tm_array[i];
511  initialized = true;
512  }
513  }
514  break;
515  }
516  case kCHAR:
517  case kVARCHAR:
518  case kTEXT: {
520  if (!initialized) {
521  elem_min.intval = 1;
522  elem_max.intval = 0;
523  }
524  if (array.is_null) {
525  break;
526  }
527  const int32_t* int_array = (int32_t*)array.pointer;
528  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
529  if (int_array[i] == NULL_INT) {
530  has_nulls = true;
531  } else if (initialized) {
532  elem_min.intval = std::min(elem_min.intval, int_array[i]);
533  elem_max.intval = std::max(elem_max.intval, int_array[i]);
534  } else {
535  elem_min.intval = int_array[i];
536  elem_max.intval = int_array[i];
537  initialized = true;
538  }
539  }
540  break;
541  }
542  default:
543  UNREACHABLE();
544  }
545  };
546 
547 }; // class FixedLengthArrayNoneEncoder
548 
549 #endif // FIXED_LENGTH_ARRAY_NONE_ENCODER_H
int8_t tinyintval
Definition: Datum.h:71
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define NULL_DOUBLE
void updateStats(const int8_t *const src_data, const size_t num_elements) override
size_t num_elems_
Definition: Encoder.h:288
Definition: sqltypes.h:76
#define NULL_ARRAY_INT
#define NULL_FLOAT
DecimalOverflowValidator decimal_overflow_validator_
Definition: Encoder.h:292
#define NULL_BIGINT
std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *index_data, int8_t *data, const size_t start_idx, const size_t num_elements) override
#define NULL_ARRAY_SMALLINT
int8_t boolval
Definition: Datum.h:70
#define UNREACHABLE()
Definition: Logger.h:338
bool has_nulls
Definition: ChunkMetadata.h:30
dictionary stats
Definition: report.py:116
#define NULL_ARRAY_TINYINT
int32_t intval
Definition: Datum.h:73
#define NULL_INT
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:231
void updateStats(const double, const bool) override
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
float floatval
Definition: Datum.h:75
FixedLengthArrayNoneEncoder(AbstractBuffer *buffer, size_t as)
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
Data_Namespace::AbstractBuffer * buffer_
Definition: Encoder.h:290
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override
bool DatumEqual(const Datum a, const Datum b, const SQLTypeInfo &ti)
Definition: Datum.cpp:408
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit) override
void copyMetadata(const Encoder *copyFromEncoder) override
int64_t bigintval
Definition: Datum.h:74
size_t getNumElems() const
Definition: Encoder.h:284
#define NULL_ARRAY_FLOAT
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *index_data, int8_t *data, const std::vector< size_t > &selected_idx) override
int16_t smallintval
Definition: Datum.h:72
An AbstractBuffer is a unit of data management for a data manager.
#define NULL_BOOLEAN
Definition: sqltypes.h:79
Definition: sqltypes.h:80
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
Definition: sqltypes.h:68
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
SQLTypeInfo getSqlType() const
#define NULL_TINYINT
#define NULL_ARRAY_DOUBLE
void update_elem_stats(const ArrayDatum &array)
void reduceStats(const Encoder &) override
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
void updateStats(const int64_t, const bool) override
#define NULL_SMALLINT
#define NULL_ARRAY_BIGINT
static bool is_null(const SQLTypeInfo &type, int8_t *array)
Definition: sqltypes.h:72
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
#define NULL_ARRAY_BOOLEAN
constexpr double n
Definition: Utm.h:38
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
static bool is_null_ignore_not_null(const SQLTypeInfo &type, int8_t *array)
size_t getNumElemsForBytesInsertData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
void validate(T value) const
Definition: Encoder.h:54
Definition: Datum.h:69
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
virtual void reserve(size_t num_bytes)=0
double doubleval
Definition: Datum.h:76
std::shared_ptr< ChunkMetadata > appendData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const bool replicating=false)
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.