OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Chunk.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DataMgr/Chunk/Chunk.h"
28 #include "Shared/toString.h"
29 
30 namespace Chunk_NS {
31 std::shared_ptr<Chunk> Chunk::getChunk(const ColumnDescriptor* cd,
32  DataMgr* data_mgr,
33  const ChunkKey& key,
34  const MemoryLevel memoryLevel,
35  const int deviceId,
36  const size_t numBytes,
37  const size_t numElems,
38  const bool pinnable) {
39  std::shared_ptr<Chunk> chunkp = std::make_shared<Chunk>(Chunk(cd, pinnable));
40  chunkp->getChunkBuffer(data_mgr, key, memoryLevel, deviceId, numBytes, numElems);
41  return chunkp;
42 }
43 
44 std::shared_ptr<Chunk> Chunk::getChunk(const ColumnDescriptor* cd,
45  AbstractBuffer* data_buffer,
46  AbstractBuffer* index_buffer,
47  const bool pinnable) {
48  std::shared_ptr<Chunk> chunkp = std::make_shared<Chunk>(Chunk(cd, pinnable));
49  chunkp->setChunkBuffer(data_buffer, index_buffer);
50  return chunkp;
51 }
52 
54  const ChunkKey& key,
55  const MemoryLevel mem_level,
56  const int device_id) {
59  ChunkKey subKey = key;
60  ChunkKey indexKey(subKey);
61  indexKey.push_back(1);
62  ChunkKey dataKey(subKey);
63  dataKey.push_back(2);
64  return data_mgr->isBufferOnDevice(indexKey, mem_level, device_id) &&
65  data_mgr->isBufferOnDevice(dataKey, mem_level, device_id);
66  } else {
67  return data_mgr->isBufferOnDevice(key, mem_level, device_id);
68  }
69 }
70 
71 void Chunk::setChunkBuffer(AbstractBuffer* buffer, AbstractBuffer* index_buffer) {
74  CHECK(index_buffer);
75  buffer_ = buffer;
76  index_buf_ = index_buffer;
77  switch (column_desc_->columnType.get_type()) {
78  case kARRAY: {
79  auto array_encoder = dynamic_cast<ArrayNoneEncoder*>(buffer_->getEncoder());
80  CHECK(array_encoder);
81  array_encoder->setIndexBuffer(index_buf_);
82  break;
83  }
84  case kTEXT:
85  case kVARCHAR:
86  case kCHAR: {
88  auto str_encoder = dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
89  CHECK(str_encoder);
90  str_encoder->setIndexBuffer(index_buf_);
91  break;
92  }
93  case kPOINT:
94  case kMULTIPOINT:
95  case kLINESTRING:
96  case kMULTILINESTRING:
97  case kPOLYGON:
98  case kMULTIPOLYGON: {
99  auto str_encoder = dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
100  CHECK(str_encoder);
101  str_encoder->setIndexBuffer(index_buf_);
102  break;
103  }
104  default:
105  UNREACHABLE();
106  }
107  } else {
108  buffer_ = buffer;
109  }
110 }
111 
113  const ChunkKey& key,
114  const MemoryLevel mem_level,
115  const int device_id,
116  const size_t num_bytes,
117  const size_t num_elems) {
120  ChunkKey data_key = key;
121  data_key.push_back(1);
122  ChunkKey index_key = key;
123  index_key.push_back(2);
125  data_mgr->getChunkBuffer(data_key, mem_level, device_id, num_bytes),
126  data_mgr->getChunkBuffer(
127  index_key, mem_level, device_id, (num_elems + 1) * sizeof(StringOffsetT)));
128 
129  } else {
130  setChunkBuffer(data_mgr->getChunkBuffer(key, mem_level, device_id, num_bytes),
131  nullptr);
132  }
133 }
134 
136  const ChunkKey& key,
137  const MemoryLevel mem_level,
138  const int device_id,
139  const size_t page_size) {
142  ChunkKey subKey = key;
143  subKey.push_back(1); // 1 for the main buffer_
144  buffer_ = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
145  subKey.pop_back();
146  subKey.push_back(2); // 2 for the index buffer_
147  index_buf_ = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
148  } else {
149  buffer_ = data_mgr->createChunkBuffer(key, mem_level, device_id, page_size);
150  }
151 }
152 
154  const int8_t* index_data,
155  const std::vector<size_t>& selected_idx,
156  const size_t byte_limit) {
160  index_data, selected_idx, byte_limit);
161 }
162 
164  const size_t num_elems,
165  const size_t start_idx,
166  const size_t byte_limit,
167  const bool replicating) {
169  switch (column_desc_->columnType.get_type()) {
170  case kARRAY: {
171  if (column_desc_->columnType.get_size() > 0) {
172  FixedLengthArrayNoneEncoder* array_encoder =
174  return array_encoder->getNumElemsForBytesInsertData(
175  src_data.arraysPtr, start_idx, num_elems, byte_limit, replicating);
176  }
177  ArrayNoneEncoder* array_encoder =
178  dynamic_cast<ArrayNoneEncoder*>(buffer_->getEncoder());
179  return array_encoder->getNumElemsForBytesInsertData(
180  src_data.arraysPtr, start_idx, num_elems, byte_limit, replicating);
181  }
182  case kTEXT:
183  case kVARCHAR:
184  case kCHAR: {
186  StringNoneEncoder* str_encoder =
187  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
188  return str_encoder->getNumElemsForBytesInsertData(
189  src_data.stringsPtr, start_idx, num_elems, byte_limit, replicating);
190  }
191  case kPOINT:
192  case kMULTIPOINT:
193  case kLINESTRING:
194  case kMULTILINESTRING:
195  case kPOLYGON:
196  case kMULTIPOLYGON: {
197  StringNoneEncoder* str_encoder =
198  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
199  return str_encoder->getNumElemsForBytesInsertData(
200  src_data.stringsPtr, start_idx, num_elems, byte_limit, replicating);
201  }
202  default:
203  CHECK(false);
204  return 0;
205  }
206 }
207 
208 std::shared_ptr<ChunkMetadata> Chunk::appendEncodedDataAtIndices(
209  const Chunk& src_chunk,
210  const std::vector<size_t>& selected_idx) {
211  const auto& ti = column_desc_->columnType;
212  int8_t* data_buffer_ptr = src_chunk.getBuffer()->getMemoryPtr();
213  const int8_t* index_buffer_ptr =
214  ti.is_varlen_indeed() ? src_chunk.getIndexBuf()->getMemoryPtr() : nullptr;
217  index_buffer_ptr, data_buffer_ptr, selected_idx);
218 }
219 
220 std::shared_ptr<ChunkMetadata> Chunk::appendEncodedData(const Chunk& src_chunk,
221  const size_t num_elements,
222  const size_t start_idx) {
223  const auto& ti = column_desc_->columnType;
224  int8_t* data_buffer_ptr = src_chunk.getBuffer()->getMemoryPtr();
225  const int8_t* index_buffer_ptr =
226  ti.is_varlen_indeed() ? src_chunk.getIndexBuf()->getMemoryPtr() : nullptr;
229  index_buffer_ptr, data_buffer_ptr, start_idx, num_elements);
230 }
231 
232 std::shared_ptr<ChunkMetadata> Chunk::appendData(DataBlockPtr& src_data,
233  const size_t num_elems,
234  const size_t start_idx,
235  const bool replicating) {
236  const auto& ti = column_desc_->columnType;
237  if (ti.is_varlen()) {
238  switch (ti.get_type()) {
239  case kARRAY: {
240  if (ti.get_size() > 0) {
241  FixedLengthArrayNoneEncoder* array_encoder =
243  return array_encoder->appendData(
244  src_data.arraysPtr, start_idx, num_elems, replicating);
245  }
246  ArrayNoneEncoder* array_encoder =
247  dynamic_cast<ArrayNoneEncoder*>(buffer_->getEncoder());
248  return array_encoder->appendData(
249  src_data.arraysPtr, start_idx, num_elems, replicating);
250  }
251  case kTEXT:
252  case kVARCHAR:
253  case kCHAR: {
254  CHECK_EQ(kENCODING_NONE, ti.get_compression());
255  StringNoneEncoder* str_encoder =
256  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
257  return str_encoder->appendData(
258  src_data.stringsPtr, start_idx, num_elems, replicating);
259  }
260  case kPOINT:
261  case kMULTIPOINT:
262  case kLINESTRING:
263  case kMULTILINESTRING:
264  case kPOLYGON:
265  case kMULTIPOLYGON: {
266  StringNoneEncoder* str_encoder =
267  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
268  return str_encoder->appendData(
269  src_data.stringsPtr, start_idx, num_elems, replicating);
270  }
271  default:
272  CHECK(false);
273  }
274  }
275  return buffer_->getEncoder()->appendData(
276  src_data.numbersPtr, num_elems, ti, replicating);
277 }
278 
280  if (pinnable_) {
281  if (buffer_) {
282  buffer_->unPin();
283  }
284  if (index_buf_) {
285  index_buf_->unPin();
286  }
287  }
288 }
289 
294  switch (column_desc_->columnType.get_type()) {
295  case kARRAY: {
296  ArrayNoneEncoder* array_encoder =
297  dynamic_cast<ArrayNoneEncoder*>(buffer_->getEncoder());
298  array_encoder->setIndexBuffer(index_buf_);
299  break;
300  }
301  case kTEXT:
302  case kVARCHAR:
303  case kCHAR: {
305  StringNoneEncoder* str_encoder =
306  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
307  str_encoder->setIndexBuffer(index_buf_);
308  break;
309  }
310  case kPOINT:
311  case kMULTIPOINT:
312  case kLINESTRING:
313  case kMULTILINESTRING:
314  case kPOLYGON:
315  case kMULTIPOLYGON: {
316  StringNoneEncoder* str_encoder =
317  dynamic_cast<StringNoneEncoder*>(buffer_->getEncoder());
318  str_encoder->setIndexBuffer(index_buf_);
319  break;
320  }
321  default:
322  CHECK(false);
323  }
324  }
325 }
326 
327 ChunkIter Chunk::begin_iterator(const std::shared_ptr<ChunkMetadata>& chunk_metadata,
328  int start_idx,
329  int skip) const {
330  ChunkIter it;
332  it.skip = skip;
334  if (it.skip_size < 0) { // if it's variable length
335  it.current_pos = it.start_pos =
336  index_buf_->getMemoryPtr() + start_idx * sizeof(StringOffsetT);
339  } else {
340  it.current_pos = it.start_pos = buffer_->getMemoryPtr() + start_idx * it.skip_size;
341  it.end_pos = buffer_->getMemoryPtr() + buffer_->size();
342  it.second_buf = nullptr;
343  }
344  it.num_elems = chunk_metadata->numElements;
345  return it;
346 }
347 
349  const std::list<const ColumnDescriptor*>& colDescs,
350  std::vector<Chunk>& chunkVec) {
351  for (auto cd : colDescs) {
352  chunkVec.emplace_back(cd);
353  }
354 }
355 
356 std::string Chunk::toString() const {
357  return ::typeName(this) + "(buffer=" + ::toString(buffer_) +
358  ", index_buf=" + ::toString(index_buf_) +
359  ", column_desc=" + ::toString(column_desc_) + ")";
360 }
361 } // namespace Chunk_NS
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
size_t getNumElemsForBytesInsertData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
int8_t * start_pos
Definition: ChunkIter.h:34
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
int8_t * current_pos
Definition: ChunkIter.h:33
SQLTypeInfo type_info
Definition: ChunkIter.h:31
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
const ColumnDescriptor * column_desc_
Definition: Chunk.h:165
bool is_varlen() const
Definition: sqltypes.h:629
virtual int8_t * getMemoryPtr()=0
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
#define UNREACHABLE()
Definition: Logger.h:338
void getChunkBuffer(DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId=0, const size_t num_bytes=0, const size_t num_elems=0)
Definition: Chunk.cpp:112
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const Chunk &src_chunk, const std::vector< size_t > &selected_idx)
Definition: Chunk.cpp:208
void setChunkBuffer(AbstractBuffer *buffer, AbstractBuffer *index_buffer)
Definition: Chunk.cpp:71
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
void initEncoder(const SQLTypeInfo &tmp_sql_type)
size_t getNumElemsForBytesInsertData(const std::vector< std::string > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
int32_t StringOffsetT
Definition: sqltypes.h:1493
virtual size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit)=0
bool is_fixlen_array() const
Definition: sqltypes.h:589
int8_t * end_pos
Definition: ChunkIter.h:35
size_t num_elems
Definition: ChunkIter.h:38
AbstractBuffer * buffer_
Definition: Chunk.h:163
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
void unpinBuffer()
Definition: Chunk.cpp:279
bool isChunkOnDevice(DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int device_id)
Definition: Chunk.cpp:53
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void setIndexBuffer(AbstractBuffer *buf)
int skip_size
Definition: ChunkIter.h:37
Definition: sqltypes.h:79
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
bool isBufferOnDevice(const ChunkKey &key, const MemoryLevel memLevel, const int deviceId)
Definition: DataMgr.cpp:489
AbstractBuffer * getChunkBuffer(const ChunkKey &key, const MemoryLevel memoryLevel, const int deviceId=0, const size_t numBytes=0)
Definition: DataMgr.cpp:511
int8_t * second_buf
Definition: ChunkIter.h:32
Definition: sqltypes.h:68
std::string typeName(const T *v)
Definition: toString.h:106
void initEncoder()
Definition: Chunk.cpp:290
unencoded fixed length array encoder
int skip
Definition: ChunkIter.h:36
#define CHECK(condition)
Definition: Logger.h:291
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit)
Definition: Chunk.cpp:153
void setIndexBuffer(AbstractBuffer *buf)
std::shared_ptr< ChunkMetadata > appendEncodedData(const Chunk &src_chunk, const size_t num_elements, const size_t start_idx)
Definition: Chunk.cpp:220
For unencoded strings.
std::string toString() const
Definition: Chunk.cpp:356
AbstractBuffer * createChunkBuffer(const ChunkKey &key, const MemoryLevel memoryLevel, const int deviceId=0, const size_t page_size=0)
Definition: DataMgr.cpp:502
size_t getNumElemsForBytesInsertData(const DataBlockPtr &src_data, const size_t num_elems, const size_t start_idx, const size_t byte_limit, const bool replicating=false)
Definition: Chunk.cpp:163
ChunkIter begin_iterator(const std::shared_ptr< ChunkMetadata > &, int start_idx=0, int skip=1) const
Definition: Chunk.cpp:327
void createChunkBuffer(DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId=0, const size_t page_size=0)
Definition: Chunk.cpp:135
SQLTypeInfo columnType
static std::shared_ptr< Chunk > getChunk(const ColumnDescriptor *cd, DataMgr *data_mgr, const ChunkKey &key, const MemoryLevel mem_level, const int deviceId, const size_t num_bytes, const size_t num_elems, const bool pinnable=true)
Definition: Chunk.cpp:31
virtual std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *index_data, int8_t *data, const std::vector< size_t > &selected_idx)=0
AbstractBuffer * index_buf_
Definition: Chunk.h:164
int8_t * numbersPtr
Definition: sqltypes.h:233
unencoded array encoder
size_t getNumElemsForBytesInsertData(const std::vector< ArrayDatum > *srcData, const int start_idx, const size_t numAppendElems, const size_t byteLimit, const bool replicating=false)
Chunk(bool pinnable=true)
Definition: Chunk.h:43
virtual std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *index_data, int8_t *data, const size_t start_idx, const size_t num_elements)=0
bool pinnable_
Definition: Chunk.h:168
std::shared_ptr< ChunkMetadata > appendData(DataBlockPtr &srcData, const size_t numAppendElems, const size_t startIdx, const bool replicating=false)
Definition: Chunk.cpp:232
static void translateColumnDescriptorsToChunkVec(const std::list< const ColumnDescriptor * > &colDescs, std::vector< Chunk > &chunkVec)
Definition: Chunk.cpp:348
virtual std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1)=0