OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrayNoneEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #ifndef ARRAY_NONE_ENCODER_H
25 #define ARRAY_NONE_ENCODER_H
26 
27 #include "Logger/Logger.h"
28 
29 #include <cassert>
30 #include <cstring>
31 #include <memory>
32 #include <mutex>
33 #include <string>
34 #include <vector>
35 #include "AbstractBuffer.h"
36 #include "ChunkMetadata.h"
37 #include "Encoder.h"
38 
40 
41 class ArrayNoneEncoder : public Encoder {
42  public:
44  : Encoder(buffer)
45  , has_nulls(false)
47  , index_buf(nullptr)
48  , last_offset(-1) {}
49 
50  size_t getNumElemsForBytesInsertData(const std::vector<ArrayDatum>* srcData,
51  const int start_idx,
52  const size_t numAppendElems,
53  const size_t byteLimit,
54  const bool replicating = false) {
55  size_t dataSize = 0;
56 
57  size_t n = start_idx;
58  for (; n < start_idx + numAppendElems; n++) {
59  size_t len = (*srcData)[replicating ? 0 : n].length;
60  if (dataSize + len > byteLimit) {
61  break;
62  }
63  dataSize += len;
64  }
65  return n - start_idx;
66  }
67 
68  std::shared_ptr<ChunkMetadata> appendData(int8_t*& src_data,
69  const size_t num_elems_to_append,
70  const SQLTypeInfo& ti,
71  const bool replicating = false,
72  const int64_t offset = -1) override {
73  UNREACHABLE(); // should never be called for arrays
74  return nullptr;
75  }
76 
77  std::shared_ptr<ChunkMetadata> appendData(const std::vector<ArrayDatum>* srcData,
78  const int start_idx,
79  const size_t numAppendElems,
80  const bool replicating) {
81  CHECK(index_buf != nullptr); // index_buf must be set before this.
82  size_t index_size = numAppendElems * sizeof(ArrayOffsetT);
83  if (num_elems_ == 0) {
84  index_size += sizeof(ArrayOffsetT); // plus one for the initial offset
85  }
86  index_buf->reserve(index_size);
87 
88  bool first_elem_padded = false;
89  ArrayOffsetT initial_offset = 0;
90  if (num_elems_ == 0) {
91  if ((*srcData)[0].is_null || (*srcData)[0].length <= 1) {
92  // Covers following potentially problematic first arrays:
93  // (1) NULL array, issue - can't encode a NULL with 0 initial offset
94  // otherwise, if first array is not NULL:
95  // (2) length=1 array - could be followed by a {}*/NULL, covers tinyint,bool
96  // (3) empty array - could be followed by {}*/NULL, or {}*|{x}|{}*|NULL, etc.
97  initial_offset = DEFAULT_NULL_PADDING_SIZE;
98  first_elem_padded = true;
99  }
100  index_buf->append((int8_t*)&initial_offset,
101  sizeof(ArrayOffsetT)); // write the initial offset
102  last_offset = initial_offset;
103  } else {
104  // Valid last_offset is never negative
105  // always need to read a valid last offset from buffer/disk
106  // b/c now due to vacuum "last offset" may go backward and if
107  // index chunk was not reloaded last_offset would go way off!
108  index_buf->read((int8_t*)&last_offset,
109  sizeof(ArrayOffsetT),
110  index_buf->size() - sizeof(ArrayOffsetT),
112  CHECK(last_offset != -1);
113  // If the loaded offset is negative it means the last value was a NULL array,
114  // convert to a valid last offset
115  if (last_offset < 0) {
117  }
118  }
119  // Need to start data from 8 byte offset if first array encoded is a NULL array
120  size_t data_size = (first_elem_padded) ? DEFAULT_NULL_PADDING_SIZE : 0;
121  for (size_t n = start_idx; n < start_idx + numAppendElems; n++) {
122  // NULL arrays don't take any space so don't add to the data size
123  if ((*srcData)[replicating ? 0 : n].is_null) {
124  continue;
125  }
126  data_size += (*srcData)[replicating ? 0 : n].length;
127  }
128  buffer_->reserve(data_size);
129 
130  size_t inbuf_size =
131  std::min(std::max(index_size, data_size), (size_t)MAX_INPUT_BUF_SIZE);
132  auto gc_inbuf = std::make_unique<int8_t[]>(inbuf_size);
133  auto inbuf = gc_inbuf.get();
134  for (size_t num_appended = 0; num_appended < numAppendElems;) {
135  ArrayOffsetT* p = (ArrayOffsetT*)inbuf;
136  size_t i;
137  for (i = 0; num_appended < numAppendElems && i < inbuf_size / sizeof(ArrayOffsetT);
138  i++, num_appended++) {
139  p[i] =
140  last_offset + (*srcData)[replicating ? 0 : num_appended + start_idx].length;
141  last_offset = p[i];
142  if ((*srcData)[replicating ? 0 : num_appended + start_idx].is_null) {
143  // Record array NULLness in the index buffer
144  p[i] = -p[i];
145  }
146  }
147  index_buf->append(inbuf, i * sizeof(ArrayOffsetT));
148  }
149 
150  // Pad buffer_ with 8 bytes if first encoded array is a NULL array
151  if (first_elem_padded) {
152  auto padding_size = DEFAULT_NULL_PADDING_SIZE;
153  buffer_->append(inbuf, padding_size);
154  }
155  for (size_t num_appended = 0; num_appended < numAppendElems;) {
156  size_t size = 0;
157  for (int i = start_idx + num_appended;
158  num_appended < numAppendElems && size < inbuf_size;
159  i++, num_appended++) {
160  if ((*srcData)[replicating ? 0 : i].is_null) {
161  continue; // NULL arrays don't take up any space in the data buffer
162  }
163  size_t len = (*srcData)[replicating ? 0 : i].length;
164  if (len > inbuf_size) {
165  // for large strings, append on its own
166  if (size > 0) {
167  buffer_->append(inbuf, size);
168  }
169  size = 0;
170  buffer_->append((*srcData)[replicating ? 0 : i].pointer, len);
171  num_appended++;
172  break;
173  } else if (size + len > inbuf_size) {
174  break;
175  }
176  char* dest = (char*)inbuf + size;
177  if (len > 0) {
178  std::memcpy((void*)dest, (void*)(*srcData)[replicating ? 0 : i].pointer, len);
179  size += len;
180  }
181  }
182  if (size > 0) {
183  buffer_->append(inbuf, size);
184  }
185  }
186  // make sure buffer_ is flushed even if no new data is appended to it
187  // (e.g. empty strings) because the metadata needs to be flushed.
188  if (!buffer_->isDirty()) {
189  buffer_->setDirty();
190  }
191 
192  // keep Chunk statistics with array elements
193  for (size_t n = start_idx; n < start_idx + numAppendElems; n++) {
194  update_elem_stats((*srcData)[replicating ? 0 : n]);
195  }
196  num_elems_ += numAppendElems;
197  auto chunk_metadata = std::make_shared<ChunkMetadata>();
198  getMetadata(chunk_metadata);
199  return chunk_metadata;
200  }
201 
202  void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
203  Encoder::getMetadata(chunkMetadata); // call on parent class
204  chunkMetadata->fillChunkStats(elem_min, elem_max, has_nulls);
205  }
206 
207  // Only called from the executor for synthesized meta-information.
208  std::shared_ptr<ChunkMetadata> getMetadata(const SQLTypeInfo& ti) override {
209  auto chunk_metadata = std::make_shared<ChunkMetadata>(
210  ti, 0, 0, ChunkStats{elem_min, elem_max, has_nulls});
211  return chunk_metadata;
212  }
213 
214  void updateStats(const int64_t, const bool) override { CHECK(false); }
215 
216  void updateStats(const double, const bool) override { CHECK(false); }
217 
218  void updateStats(const int8_t* const src_data, const size_t num_elements) override {
219  CHECK(false);
220  }
221 
222  void updateStats(const std::vector<std::string>* const src_data,
223  const size_t start_idx,
224  const size_t num_elements) override {
225  UNREACHABLE();
226  }
227 
228  void updateStats(const std::vector<ArrayDatum>* const src_data,
229  const size_t start_idx,
230  const size_t num_elements) override {
231  for (size_t n = start_idx; n < start_idx + num_elements; n++) {
232  update_elem_stats((*src_data)[n]);
233  }
234  }
235 
236  void reduceStats(const Encoder&) override { CHECK(false); }
237 
238  void writeMetadata(FILE* f) override {
239  // assumes pointer is already in right place
240  fwrite((int8_t*)&num_elems_, sizeof(size_t), 1, f);
241  fwrite((int8_t*)&elem_min, sizeof(Datum), 1, f);
242  fwrite((int8_t*)&elem_max, sizeof(Datum), 1, f);
243  fwrite((int8_t*)&has_nulls, sizeof(bool), 1, f);
244  fwrite((int8_t*)&initialized, sizeof(bool), 1, f);
245  }
246 
247  void readMetadata(FILE* f) override {
248  // assumes pointer is already in right place
249  fread((int8_t*)&num_elems_, sizeof(size_t), 1, f);
250  fread((int8_t*)&elem_min, sizeof(Datum), 1, f);
251  fread((int8_t*)&elem_max, sizeof(Datum), 1, f);
252  fread((int8_t*)&has_nulls, sizeof(bool), 1, f);
253  fread((int8_t*)&initialized, sizeof(bool), 1, f);
254  }
255 
256  void copyMetadata(const Encoder* copyFromEncoder) override {
257  num_elems_ = copyFromEncoder->getNumElems();
258  auto array_encoder = dynamic_cast<const ArrayNoneEncoder*>(copyFromEncoder);
259  elem_min = array_encoder->elem_min;
260  elem_max = array_encoder->elem_max;
261  has_nulls = array_encoder->has_nulls;
262  initialized = array_encoder->initialized;
263  }
264 
265  AbstractBuffer* getIndexBuf() const { return index_buf; }
266 
267  bool resetChunkStats(const ChunkStats& stats) override {
268  auto elem_type = buffer_->getSqlType().get_elem_type();
269  if (DatumEqual(elem_min, stats.min, elem_type) &&
270  DatumEqual(elem_max, stats.max, elem_type) && has_nulls == stats.has_nulls) {
271  return false;
272  }
273  elem_min = stats.min;
274  elem_max = stats.max;
275  has_nulls = stats.has_nulls;
276  return true;
277  }
278 
279  void resetChunkStats() override {
280  has_nulls = false;
281  initialized = false;
282  }
283 
286  bool has_nulls;
289  std::unique_lock<std::mutex> lock(EncoderMutex_);
290  index_buf = buf;
291  }
292 
293  static constexpr size_t DEFAULT_NULL_PADDING_SIZE{8};
294 
295  private:
296  std::mutex EncoderMutex_;
299 
300  void update_elem_stats(const ArrayDatum& array) {
301  if (array.is_null) {
302  has_nulls = true;
303  }
304  switch (buffer_->getSqlType().get_subtype()) {
305  case kBOOLEAN: {
306  if (!initialized) {
307  elem_min.boolval = true;
308  elem_max.boolval = false;
309  }
310  if (array.is_null || array.length == 0) {
311  break;
312  }
313  const bool* bool_array = (bool*)array.pointer;
314  for (size_t i = 0; i < array.length / sizeof(bool); i++) {
315  if ((int8_t)bool_array[i] == NULL_BOOLEAN) {
316  has_nulls = true;
317  } else if (initialized) {
318  elem_min.boolval = std::min(elem_min.boolval, bool_array[i]);
319  elem_max.boolval = std::max(elem_max.boolval, bool_array[i]);
320  } else {
321  elem_min.boolval = bool_array[i];
322  elem_max.boolval = bool_array[i];
323  initialized = true;
324  }
325  }
326  break;
327  }
328  case kINT: {
329  if (!initialized) {
330  elem_min.intval = 1;
331  elem_max.intval = 0;
332  }
333  if (array.is_null || array.length == 0) {
334  break;
335  }
336  const int32_t* int_array = (int32_t*)array.pointer;
337  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
338  if (int_array[i] == NULL_INT) {
339  has_nulls = true;
340  } else if (initialized) {
341  elem_min.intval = std::min(elem_min.intval, int_array[i]);
342  elem_max.intval = std::max(elem_max.intval, int_array[i]);
343  } else {
344  elem_min.intval = int_array[i];
345  elem_max.intval = int_array[i];
346  initialized = true;
347  }
348  }
349  break;
350  }
351  case kSMALLINT: {
352  if (!initialized) {
353  elem_min.smallintval = 1;
354  elem_max.smallintval = 0;
355  }
356  if (array.is_null || array.length == 0) {
357  break;
358  }
359  const int16_t* int_array = (int16_t*)array.pointer;
360  for (size_t i = 0; i < array.length / sizeof(int16_t); i++) {
361  if (int_array[i] == NULL_SMALLINT) {
362  has_nulls = true;
363  } else if (initialized) {
364  elem_min.smallintval = std::min(elem_min.smallintval, int_array[i]);
365  elem_max.smallintval = std::max(elem_max.smallintval, int_array[i]);
366  } else {
367  elem_min.smallintval = int_array[i];
368  elem_max.smallintval = int_array[i];
369  initialized = true;
370  }
371  }
372  break;
373  }
374  case kTINYINT: {
375  if (!initialized) {
376  elem_min.tinyintval = 1;
377  elem_max.tinyintval = 0;
378  }
379  if (array.is_null || array.length == 0) {
380  break;
381  }
382  const int8_t* int_array = (int8_t*)array.pointer;
383  for (size_t i = 0; i < array.length / sizeof(int8_t); i++) {
384  if (int_array[i] == NULL_TINYINT) {
385  has_nulls = true;
386  } else if (initialized) {
387  elem_min.tinyintval = std::min(elem_min.tinyintval, int_array[i]);
388  elem_max.tinyintval = std::max(elem_max.tinyintval, int_array[i]);
389  } else {
390  elem_min.tinyintval = int_array[i];
391  elem_max.tinyintval = int_array[i];
392  initialized = true;
393  }
394  }
395  break;
396  }
397  case kBIGINT:
398  case kNUMERIC:
399  case kDECIMAL: {
400  if (!initialized) {
401  elem_min.bigintval = 1;
402  elem_max.bigintval = 0;
403  }
404  if (array.is_null || array.length == 0) {
405  break;
406  }
407  const int64_t* int_array = (int64_t*)array.pointer;
408  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
409  if (int_array[i] == NULL_BIGINT) {
410  has_nulls = true;
411  } else if (initialized) {
412  elem_min.bigintval = std::min(elem_min.bigintval, int_array[i]);
413  elem_max.bigintval = std::max(elem_max.bigintval, int_array[i]);
414  } else {
415  elem_min.bigintval = int_array[i];
416  elem_max.bigintval = int_array[i];
417  initialized = true;
418  }
419  }
420  break;
421  }
422  case kFLOAT: {
423  if (!initialized) {
424  elem_min.floatval = 1.0;
425  elem_max.floatval = 0.0;
426  }
427  if (array.is_null || array.length == 0) {
428  break;
429  }
430  const float* flt_array = (float*)array.pointer;
431  for (size_t i = 0; i < array.length / sizeof(float); i++) {
432  if (flt_array[i] == NULL_FLOAT) {
433  has_nulls = true;
434  } else if (initialized) {
435  elem_min.floatval = std::min(elem_min.floatval, flt_array[i]);
436  elem_max.floatval = std::max(elem_max.floatval, flt_array[i]);
437  } else {
438  elem_min.floatval = flt_array[i];
439  elem_max.floatval = flt_array[i];
440  initialized = true;
441  }
442  }
443  break;
444  }
445  case kDOUBLE: {
446  if (!initialized) {
447  elem_min.doubleval = 1.0;
448  elem_max.doubleval = 0.0;
449  }
450  if (array.is_null || array.length == 0) {
451  break;
452  }
453  const double* dbl_array = (double*)array.pointer;
454  for (size_t i = 0; i < array.length / sizeof(double); i++) {
455  if (dbl_array[i] == NULL_DOUBLE) {
456  has_nulls = true;
457  } else if (initialized) {
458  elem_min.doubleval = std::min(elem_min.doubleval, dbl_array[i]);
459  elem_max.doubleval = std::max(elem_max.doubleval, dbl_array[i]);
460  } else {
461  elem_min.doubleval = dbl_array[i];
462  elem_max.doubleval = dbl_array[i];
463  initialized = true;
464  }
465  }
466  break;
467  }
468  case kTIME:
469  case kTIMESTAMP:
470  case kDATE: {
471  if (!initialized) {
472  elem_min.bigintval = 1;
473  elem_max.bigintval = 0;
474  }
475  if (array.is_null || array.length == 0) {
476  break;
477  }
478  const auto tm_array = reinterpret_cast<int64_t*>(array.pointer);
479  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
480  if (tm_array[i] == NULL_BIGINT) {
481  has_nulls = true;
482  } else if (initialized) {
483  elem_min.bigintval = std::min(elem_min.bigintval, tm_array[i]);
484  elem_max.bigintval = std::max(elem_max.bigintval, tm_array[i]);
485  } else {
486  elem_min.bigintval = tm_array[i];
487  elem_max.bigintval = tm_array[i];
488  initialized = true;
489  }
490  }
491  break;
492  }
493  case kCHAR:
494  case kVARCHAR:
495  case kTEXT: {
497  if (!initialized) {
498  elem_min.intval = 1;
499  elem_max.intval = 0;
500  }
501  if (array.is_null || array.length == 0) {
502  break;
503  }
504  const int32_t* int_array = (int32_t*)array.pointer;
505  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
506  if (int_array[i] == NULL_INT) {
507  has_nulls = true;
508  } else if (initialized) {
509  elem_min.intval = std::min(elem_min.intval, int_array[i]);
510  elem_max.intval = std::max(elem_max.intval, int_array[i]);
511  } else {
512  elem_min.intval = int_array[i];
513  elem_max.intval = int_array[i];
514  initialized = true;
515  }
516  }
517  break;
518  }
519  default:
520  UNREACHABLE();
521  }
522  };
523 
524 }; // class ArrayNoneEncoder
525 
526 #endif // ARRAY_NONE_ENCODER_H
int8_t tinyintval
Definition: sqltypes.h:206
void update_elem_stats(const ArrayDatum &array)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:315
#define CHECK_EQ(x, y)
Definition: Logger.h:211
size_t getNumElemsForBytesInsertData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
#define NULL_DOUBLE
size_t num_elems_
Definition: Encoder.h:237
Definition: sqltypes.h:48
std::shared_ptr< ChunkMetadata > appendData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const bool replicating)
#define NULL_FLOAT
#define MAX_INPUT_BUF_SIZE
Definition: Encoder.h:36
void updateStats(const double, const bool) override
#define NULL_BIGINT
std::mutex EncoderMutex_
bool boolval
Definition: sqltypes.h:205
void updateStats(const int8_t *const src_data, const size_t num_elements) override
#define UNREACHABLE()
Definition: Logger.h:247
bool has_nulls
Definition: ChunkMetadata.h:28
int32_t intval
Definition: sqltypes.h:208
#define NULL_INT
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
float floatval
Definition: sqltypes.h:210
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
Definition: Encoder.h:239
bool DatumEqual(const Datum a, const Datum b, const SQLTypeInfo &ti)
Definition: Datum.cpp:306
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
void resetChunkStats() override
int64_t bigintval
Definition: sqltypes.h:209
size_t getNumElems() const
Definition: Encoder.h:233
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
int16_t smallintval
Definition: sqltypes.h:207
An AbstractBuffer is a unit of data management for a data manager.
ArrayNoneEncoder(AbstractBuffer *buffer)
#define NULL_BOOLEAN
void setIndexBuffer(AbstractBuffer *buf)
AbstractBuffer * index_buf
Definition: sqltypes.h:51
Definition: sqltypes.h:52
void readMetadata(FILE *f) override
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
static constexpr size_t DEFAULT_NULL_PADDING_SIZE
int32_t ArrayOffsetT
Definition: sqltypes.h:938
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override
Definition: sqltypes.h:40
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
SQLTypeInfo getSqlType() const
#define NULL_TINYINT
void reduceStats(const Encoder &) override
bool g_enable_watchdog false
Definition: Execute.cpp:76
ArrayOffsetT last_offset
#define CHECK(condition)
Definition: Logger.h:203
#define NULL_SMALLINT
char * f
void writeMetadata(FILE *f) override
Definition: sqltypes.h:44
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
virtual void reserve(size_t num_bytes)=0
AbstractBuffer * getIndexBuf() const
void updateStats(const int64_t, const bool) override
void copyMetadata(const Encoder *copyFromEncoder) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
double doubleval
Definition: sqltypes.h:211