OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrayNoneEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #ifndef ARRAY_NONE_ENCODER_H
25 #define ARRAY_NONE_ENCODER_H
26 
27 #include "Logger/Logger.h"
28 
29 #include <cassert>
30 #include <cstring>
31 #include <memory>
32 #include <mutex>
33 #include <string>
34 #include <vector>
35 #include "AbstractBuffer.h"
36 #include "ChunkMetadata.h"
37 #include "Encoder.h"
38 
40 
41 class ArrayNoneEncoder : public Encoder {
42  public:
44  : Encoder(buffer)
45  , has_nulls(false)
47  , index_buf(nullptr)
48  , last_offset(-1) {}
49 
50  size_t getNumElemsForBytesInsertData(const std::vector<ArrayDatum>* srcData,
51  const int start_idx,
52  const size_t numAppendElems,
53  const size_t byteLimit,
54  const bool replicating = false) {
55  size_t dataSize = 0;
56 
57  size_t n = start_idx;
58  for (; n < start_idx + numAppendElems; n++) {
59  size_t len = (*srcData)[replicating ? 0 : n].length;
60  if (dataSize + len > byteLimit) {
61  break;
62  }
63  dataSize += len;
64  }
65  return n - start_idx;
66  }
67 
68  std::shared_ptr<ChunkMetadata> appendData(int8_t*& src_data,
69  const size_t num_elems_to_append,
70  const SQLTypeInfo& ti,
71  const bool replicating = false,
72  const int64_t offset = -1) override {
73  UNREACHABLE(); // should never be called for arrays
74  return nullptr;
75  }
76 
77  std::shared_ptr<ChunkMetadata> appendData(const std::vector<ArrayDatum>* srcData,
78  const int start_idx,
79  const size_t numAppendElems,
80  const bool replicating) {
81  CHECK(index_buf != nullptr); // index_buf must be set before this.
82  size_t index_size = numAppendElems * sizeof(ArrayOffsetT);
83  if (num_elems_ == 0) {
84  index_size += sizeof(ArrayOffsetT); // plus one for the initial offset
85  }
86  index_buf->reserve(index_size);
87 
88  bool first_elem_is_null = false;
89  ArrayOffsetT initial_offset = 0;
90  if (num_elems_ == 0) {
91  // If the very first ArrayDatum is NULL, initial offset will be set to 8
92  // so we could negate it and write it out to index buffer to convey NULLness
93  if ((*srcData)[0].is_null) {
94  initial_offset = 8;
95  first_elem_is_null = true;
96  }
97  index_buf->append((int8_t*)&initial_offset,
98  sizeof(ArrayOffsetT)); // write the inital offset
99  last_offset = initial_offset;
100  } else {
101  // Valid last_offset is never negative
102  // always need to read a valid last offset from buffer/disk
103  // b/c now due to vacuum "last offset" may go backward and if
104  // index chunk was not reloaded last_offset would go way off!
105  index_buf->read((int8_t*)&last_offset,
106  sizeof(ArrayOffsetT),
107  index_buf->size() - sizeof(ArrayOffsetT),
109  CHECK(last_offset != -1);
110  // If the loaded offset is negative it means the last value was a NULL array,
111  // convert to a valid last offset
112  if (last_offset < 0) {
114  }
115  }
116  // Need to start data from 8 byte offset if first array encoded is a NULL array
117  size_t data_size = (first_elem_is_null) ? 8 : 0;
118  for (size_t n = start_idx; n < start_idx + numAppendElems; n++) {
119  // NULL arrays don't take any space so don't add to the data size
120  if ((*srcData)[replicating ? 0 : n].is_null) {
121  continue;
122  }
123  data_size += (*srcData)[replicating ? 0 : n].length;
124  }
125  buffer_->reserve(data_size);
126 
127  size_t inbuf_size =
128  std::min(std::max(index_size, data_size), (size_t)MAX_INPUT_BUF_SIZE);
129  auto gc_inbuf = std::make_unique<int8_t[]>(inbuf_size);
130  auto inbuf = gc_inbuf.get();
131  for (size_t num_appended = 0; num_appended < numAppendElems;) {
132  ArrayOffsetT* p = (ArrayOffsetT*)inbuf;
133  size_t i;
134  for (i = 0; num_appended < numAppendElems && i < inbuf_size / sizeof(ArrayOffsetT);
135  i++, num_appended++) {
136  p[i] =
137  last_offset + (*srcData)[replicating ? 0 : num_appended + start_idx].length;
138  last_offset = p[i];
139  if ((*srcData)[replicating ? 0 : num_appended + start_idx].is_null) {
140  // Record array NULLness in the index buffer
141  p[i] = -p[i];
142  }
143  }
144  index_buf->append(inbuf, i * sizeof(ArrayOffsetT));
145  }
146 
147  // Pad buffer_ with 8 bytes if first encoded array is a NULL array
148  if (first_elem_is_null) {
149  buffer_->append(inbuf, 8);
150  }
151  for (size_t num_appended = 0; num_appended < numAppendElems;) {
152  size_t size = 0;
153  for (int i = start_idx + num_appended;
154  num_appended < numAppendElems && size < inbuf_size;
155  i++, num_appended++) {
156  if ((*srcData)[replicating ? 0 : i].is_null) {
157  continue; // NULL arrays don't take up any space in the data buffer
158  }
159  size_t len = (*srcData)[replicating ? 0 : i].length;
160  if (len > inbuf_size) {
161  // for large strings, append on its own
162  if (size > 0) {
163  buffer_->append(inbuf, size);
164  }
165  size = 0;
166  buffer_->append((*srcData)[replicating ? 0 : i].pointer, len);
167  num_appended++;
168  break;
169  } else if (size + len > inbuf_size) {
170  break;
171  }
172  char* dest = (char*)inbuf + size;
173  if (len > 0) {
174  std::memcpy((void*)dest, (void*)(*srcData)[replicating ? 0 : i].pointer, len);
175  size += len;
176  }
177  }
178  if (size > 0) {
179  buffer_->append(inbuf, size);
180  }
181  }
182  // make sure buffer_ is flushed even if no new data is appended to it
183  // (e.g. empty strings) because the metadata needs to be flushed.
184  if (!buffer_->isDirty()) {
185  buffer_->setDirty();
186  }
187 
188  // keep Chunk statistics with array elements
189  for (size_t n = start_idx; n < start_idx + numAppendElems; n++) {
190  update_elem_stats((*srcData)[replicating ? 0 : n]);
191  }
192  num_elems_ += numAppendElems;
193  auto chunk_metadata = std::make_shared<ChunkMetadata>();
194  getMetadata(chunk_metadata);
195  return chunk_metadata;
196  }
197 
198  void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
199  Encoder::getMetadata(chunkMetadata); // call on parent class
200  chunkMetadata->fillChunkStats(elem_min, elem_max, has_nulls);
201  }
202 
203  // Only called from the executor for synthesized meta-information.
204  std::shared_ptr<ChunkMetadata> getMetadata(const SQLTypeInfo& ti) override {
205  auto chunk_metadata = std::make_shared<ChunkMetadata>(
206  ti, 0, 0, ChunkStats{elem_min, elem_max, has_nulls});
207  return chunk_metadata;
208  }
209 
210  void updateStats(const int64_t, const bool) override { CHECK(false); }
211 
212  void updateStats(const double, const bool) override { CHECK(false); }
213 
214  void updateStats(const int8_t* const src_data, const size_t num_elements) override {
215  CHECK(false);
216  }
217 
218  void updateStats(const std::vector<std::string>* const src_data,
219  const size_t start_idx,
220  const size_t num_elements) override {
221  UNREACHABLE();
222  }
223 
224  void updateStats(const std::vector<ArrayDatum>* const src_data,
225  const size_t start_idx,
226  const size_t num_elements) override {
227  for (size_t n = start_idx; n < start_idx + num_elements; n++) {
228  update_elem_stats((*src_data)[n]);
229  }
230  }
231 
232  void reduceStats(const Encoder&) override { CHECK(false); }
233 
234  void writeMetadata(FILE* f) override {
235  // assumes pointer is already in right place
236  fwrite((int8_t*)&num_elems_, sizeof(size_t), 1, f);
237  fwrite((int8_t*)&elem_min, sizeof(Datum), 1, f);
238  fwrite((int8_t*)&elem_max, sizeof(Datum), 1, f);
239  fwrite((int8_t*)&has_nulls, sizeof(bool), 1, f);
240  fwrite((int8_t*)&initialized, sizeof(bool), 1, f);
241  }
242 
243  void readMetadata(FILE* f) override {
244  // assumes pointer is already in right place
245  fread((int8_t*)&num_elems_, sizeof(size_t), 1, f);
246  fread((int8_t*)&elem_min, sizeof(Datum), 1, f);
247  fread((int8_t*)&elem_max, sizeof(Datum), 1, f);
248  fread((int8_t*)&has_nulls, sizeof(bool), 1, f);
249  fread((int8_t*)&initialized, sizeof(bool), 1, f);
250  }
251 
252  void copyMetadata(const Encoder* copyFromEncoder) override {
253  num_elems_ = copyFromEncoder->getNumElems();
254  auto array_encoder = dynamic_cast<const ArrayNoneEncoder*>(copyFromEncoder);
255  elem_min = array_encoder->elem_min;
256  elem_max = array_encoder->elem_max;
257  has_nulls = array_encoder->has_nulls;
258  initialized = array_encoder->initialized;
259  }
260 
261  AbstractBuffer* getIndexBuf() const { return index_buf; }
262 
263  bool resetChunkStats(const ChunkStats& stats) override {
264  auto elem_type = buffer_->getSqlType().get_elem_type();
265  if (DatumEqual(elem_min, stats.min, elem_type) &&
266  DatumEqual(elem_max, stats.max, elem_type) && has_nulls == stats.has_nulls) {
267  return false;
268  }
269  elem_min = stats.min;
270  elem_max = stats.max;
271  has_nulls = stats.has_nulls;
272  return true;
273  }
274 
277  bool has_nulls;
280  std::unique_lock<std::mutex> lock(EncoderMutex_);
281  index_buf = buf;
282  }
283 
284  private:
285  std::mutex EncoderMutex_;
288 
289  void update_elem_stats(const ArrayDatum& array) {
290  if (array.is_null) {
291  has_nulls = true;
292  }
293  switch (buffer_->getSqlType().get_subtype()) {
294  case kBOOLEAN: {
295  if (!initialized) {
296  elem_min.boolval = true;
297  elem_max.boolval = false;
298  }
299  if (array.is_null || array.length == 0) {
300  break;
301  }
302  const bool* bool_array = (bool*)array.pointer;
303  for (size_t i = 0; i < array.length / sizeof(bool); i++) {
304  if ((int8_t)bool_array[i] == NULL_BOOLEAN) {
305  has_nulls = true;
306  } else if (initialized) {
307  elem_min.boolval = std::min(elem_min.boolval, bool_array[i]);
308  elem_max.boolval = std::max(elem_max.boolval, bool_array[i]);
309  } else {
310  elem_min.boolval = bool_array[i];
311  elem_max.boolval = bool_array[i];
312  initialized = true;
313  }
314  }
315  break;
316  }
317  case kINT: {
318  if (!initialized) {
319  elem_min.intval = 1;
320  elem_max.intval = 0;
321  }
322  if (array.is_null || array.length == 0) {
323  break;
324  }
325  const int32_t* int_array = (int32_t*)array.pointer;
326  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
327  if (int_array[i] == NULL_INT) {
328  has_nulls = true;
329  } else if (initialized) {
330  elem_min.intval = std::min(elem_min.intval, int_array[i]);
331  elem_max.intval = std::max(elem_max.intval, int_array[i]);
332  } else {
333  elem_min.intval = int_array[i];
334  elem_max.intval = int_array[i];
335  initialized = true;
336  }
337  }
338  break;
339  }
340  case kSMALLINT: {
341  if (!initialized) {
342  elem_min.smallintval = 1;
343  elem_max.smallintval = 0;
344  }
345  if (array.is_null || array.length == 0) {
346  break;
347  }
348  const int16_t* int_array = (int16_t*)array.pointer;
349  for (size_t i = 0; i < array.length / sizeof(int16_t); i++) {
350  if (int_array[i] == NULL_SMALLINT) {
351  has_nulls = true;
352  } else if (initialized) {
353  elem_min.smallintval = std::min(elem_min.smallintval, int_array[i]);
354  elem_max.smallintval = std::max(elem_max.smallintval, int_array[i]);
355  } else {
356  elem_min.smallintval = int_array[i];
357  elem_max.smallintval = int_array[i];
358  initialized = true;
359  }
360  }
361  break;
362  }
363  case kTINYINT: {
364  if (!initialized) {
365  elem_min.tinyintval = 1;
366  elem_max.tinyintval = 0;
367  }
368  if (array.is_null || array.length == 0) {
369  break;
370  }
371  const int8_t* int_array = (int8_t*)array.pointer;
372  for (size_t i = 0; i < array.length / sizeof(int8_t); i++) {
373  if (int_array[i] == NULL_TINYINT) {
374  has_nulls = true;
375  } else if (initialized) {
376  elem_min.tinyintval = std::min(elem_min.tinyintval, int_array[i]);
377  elem_max.tinyintval = std::max(elem_max.tinyintval, int_array[i]);
378  } else {
379  elem_min.tinyintval = int_array[i];
380  elem_max.tinyintval = int_array[i];
381  initialized = true;
382  }
383  }
384  break;
385  }
386  case kBIGINT:
387  case kNUMERIC:
388  case kDECIMAL: {
389  if (!initialized) {
390  elem_min.bigintval = 1;
391  elem_max.bigintval = 0;
392  }
393  if (array.is_null || array.length == 0) {
394  break;
395  }
396  const int64_t* int_array = (int64_t*)array.pointer;
397  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
398  if (int_array[i] == NULL_BIGINT) {
399  has_nulls = true;
400  } else if (initialized) {
401  elem_min.bigintval = std::min(elem_min.bigintval, int_array[i]);
402  elem_max.bigintval = std::max(elem_max.bigintval, int_array[i]);
403  } else {
404  elem_min.bigintval = int_array[i];
405  elem_max.bigintval = int_array[i];
406  initialized = true;
407  }
408  }
409  break;
410  }
411  case kFLOAT: {
412  if (!initialized) {
413  elem_min.floatval = 1.0;
414  elem_max.floatval = 0.0;
415  }
416  if (array.is_null || array.length == 0) {
417  break;
418  }
419  const float* flt_array = (float*)array.pointer;
420  for (size_t i = 0; i < array.length / sizeof(float); i++) {
421  if (flt_array[i] == NULL_FLOAT) {
422  has_nulls = true;
423  } else if (initialized) {
424  elem_min.floatval = std::min(elem_min.floatval, flt_array[i]);
425  elem_max.floatval = std::max(elem_max.floatval, flt_array[i]);
426  } else {
427  elem_min.floatval = flt_array[i];
428  elem_max.floatval = flt_array[i];
429  initialized = true;
430  }
431  }
432  break;
433  }
434  case kDOUBLE: {
435  if (!initialized) {
436  elem_min.doubleval = 1.0;
437  elem_max.doubleval = 0.0;
438  }
439  if (array.is_null || array.length == 0) {
440  break;
441  }
442  const double* dbl_array = (double*)array.pointer;
443  for (size_t i = 0; i < array.length / sizeof(double); i++) {
444  if (dbl_array[i] == NULL_DOUBLE) {
445  has_nulls = true;
446  } else if (initialized) {
447  elem_min.doubleval = std::min(elem_min.doubleval, dbl_array[i]);
448  elem_max.doubleval = std::max(elem_max.doubleval, dbl_array[i]);
449  } else {
450  elem_min.doubleval = dbl_array[i];
451  elem_max.doubleval = dbl_array[i];
452  initialized = true;
453  }
454  }
455  break;
456  }
457  case kTIME:
458  case kTIMESTAMP:
459  case kDATE: {
460  if (!initialized) {
461  elem_min.bigintval = 1;
462  elem_max.bigintval = 0;
463  }
464  if (array.is_null || array.length == 0) {
465  break;
466  }
467  const auto tm_array = reinterpret_cast<int64_t*>(array.pointer);
468  for (size_t i = 0; i < array.length / sizeof(int64_t); i++) {
469  if (tm_array[i] == NULL_BIGINT) {
470  has_nulls = true;
471  } else if (initialized) {
472  elem_min.bigintval = std::min(elem_min.bigintval, tm_array[i]);
473  elem_max.bigintval = std::max(elem_max.bigintval, tm_array[i]);
474  } else {
475  elem_min.bigintval = tm_array[i];
476  elem_max.bigintval = tm_array[i];
477  initialized = true;
478  }
479  }
480  break;
481  }
482  case kCHAR:
483  case kVARCHAR:
484  case kTEXT: {
486  if (!initialized) {
487  elem_min.intval = 1;
488  elem_max.intval = 0;
489  }
490  if (array.is_null || array.length == 0) {
491  break;
492  }
493  const int32_t* int_array = (int32_t*)array.pointer;
494  for (size_t i = 0; i < array.length / sizeof(int32_t); i++) {
495  if (int_array[i] == NULL_INT) {
496  has_nulls = true;
497  } else if (initialized) {
498  elem_min.intval = std::min(elem_min.intval, int_array[i]);
499  elem_max.intval = std::max(elem_max.intval, int_array[i]);
500  } else {
501  elem_min.intval = int_array[i];
502  elem_max.intval = int_array[i];
503  initialized = true;
504  }
505  }
506  break;
507  }
508  default:
509  UNREACHABLE();
510  }
511  };
512 
513 }; // class ArrayNoneEncoder
514 
515 #endif // ARRAY_NONE_ENCODER_H
int8_t tinyintval
Definition: sqltypes.h:203
void update_elem_stats(const ArrayDatum &array)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:312
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t getNumElemsForBytesInsertData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
#define NULL_DOUBLE
size_t num_elems_
Definition: Encoder.h:232
Definition: sqltypes.h:48
std::shared_ptr< ChunkMetadata > appendData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const bool replicating)
#define NULL_FLOAT
#define MAX_INPUT_BUF_SIZE
Definition: Encoder.h:36
void updateStats(const double, const bool) override
#define NULL_BIGINT
std::mutex EncoderMutex_
bool boolval
Definition: sqltypes.h:202
void updateStats(const int8_t *const src_data, const size_t num_elements) override
#define UNREACHABLE()
Definition: Logger.h:241
bool has_nulls
Definition: ChunkMetadata.h:28
int32_t intval
Definition: sqltypes.h:205
#define NULL_INT
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:199
float floatval
Definition: sqltypes.h:207
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
Definition: Encoder.h:234
bool DatumEqual(const Datum a, const Datum b, const SQLTypeInfo &ti)
Definition: Datum.cpp:190
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
int64_t bigintval
Definition: sqltypes.h:206
size_t getNumElems() const
Definition: Encoder.h:228
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
int16_t smallintval
Definition: sqltypes.h:204
An AbstractBuffer is a unit of data management for a data manager.
ArrayNoneEncoder(AbstractBuffer *buffer)
#define NULL_BOOLEAN
void setIndexBuffer(AbstractBuffer *buf)
AbstractBuffer * index_buf
Definition: sqltypes.h:51
Definition: sqltypes.h:52
void readMetadata(FILE *f) override
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
int32_t ArrayOffsetT
Definition: sqltypes.h:920
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override
Definition: sqltypes.h:40
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
SQLTypeInfo getSqlType() const
#define NULL_TINYINT
void reduceStats(const Encoder &) override
bool g_enable_watchdog false
Definition: Execute.cpp:76
ArrayOffsetT last_offset
#define CHECK(condition)
Definition: Logger.h:197
#define NULL_SMALLINT
void writeMetadata(FILE *f) override
Definition: sqltypes.h:44
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:697
virtual void reserve(size_t num_bytes)=0
AbstractBuffer * getIndexBuf() const
void updateStats(const int64_t, const bool) override
void copyMetadata(const Encoder *copyFromEncoder) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
double doubleval
Definition: sqltypes.h:208