OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertDataLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019, OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <numeric>
19 #include <vector>
20 
21 #include "../Shared/shard_key.h"
22 #include "InsertDataLoader.h"
24 
25 namespace Fragmenter_Namespace {
26 
28  std::vector<std::vector<uint8_t>> rawData;
29  std::vector<std::vector<std::string>> stringData;
30  std::vector<std::vector<ArrayDatum>> arrayData;
31 };
32 
33 template <typename SRC>
34 std::vector<std::vector<size_t>> computeRowIndicesOfShards(size_t shardCount,
35  size_t leafCount,
36  size_t rowCount,
37  SRC* src) {
38  const auto numShardTables = shardCount * leafCount;
39 
40  std::vector<std::vector<size_t>> rowIndicesOfShards(numShardTables);
41 
42  for (size_t row = 0; row < rowCount; row++) {
43  // expecting unsigned data
44  // thus, no need for double remainder
45  auto shardId = (std::is_unsigned<SRC>::value)
46  ? src[row] % numShardTables
47  : SHARD_FOR_KEY(src[row], numShardTables);
48  rowIndicesOfShards[shardId].push_back(row);
49  }
50 
51  return rowIndicesOfShards;
52 }
53 
54 template <typename T>
55 size_t indexOf(std::vector<T>& vec, T val) {
56  typename std::vector<T>::iterator it = std::find(vec.begin(), vec.end(), val);
57  CHECK(it != vec.end());
58  return std::distance(vec.begin(), it);
59 }
60 
62  return (cd->columnType.is_geometry()) ||
63  (cd->columnType.is_string() &&
65 }
66 
68  return cd->columnType.is_array();
69 }
70 
72  const ColumnDescriptor* cd) {
73  switch (cd->columnType.get_type()) {
74  case kPOINT:
75  case kLINESTRING:
76  case kPOLYGON:
77  case kMULTIPOLYGON:
78  case kARRAY:
79  throw std::runtime_error("geo and array columns have variable length elements");
80  case kBOOLEAN:
81  case kTINYINT:
82  case kSMALLINT:
83  case kINT:
84  case kBIGINT:
85  case kNUMERIC:
86  case kDECIMAL:
87  case kFLOAT:
88  case kDOUBLE:
89  case kTIMESTAMP:
90  case kTIME:
91  case kINTERVAL_DAY_TIME:
93  case kDATE:
94  return cd->columnType.get_logical_size();
95  case kTEXT:
96  case kVARCHAR:
97  case kCHAR:
99  throw std::runtime_error(
100  "non encoded string columns have variable length elements");
101  }
102  return cd->columnType.get_size();
103  default:
104  throw std::runtime_error("not supported column type: " + cd->columnName + " (" +
105  cd->columnType.get_type_name() + ")");
106  }
107 }
108 
109 std::vector<std::vector<size_t>> computeRowIndicesOfShards(
110  const Catalog_Namespace::Catalog& cat,
111  size_t leafCount,
112  InsertData& insert_data) {
113  const auto* td = cat.getMetadataForTable(insert_data.tableId);
114  const auto* shard_cd = cat.getShardColumnMetadataForTable(td);
115  auto shardDataBlockIndex = indexOf(insert_data.columnIds, shard_cd->columnId);
116  DataBlockPtr& shardDataBlock = insert_data.data[shardDataBlockIndex];
117  auto rowCount = insert_data.numRows;
118  auto shardCount = td->nShards;
119 
120  CHECK(!isStringVectorData(shard_cd));
121  CHECK(!isDatumVectorData(shard_cd));
122 
123  switch (sizeOfRawColumn(cat, shard_cd)) {
124  case 1:
126  shardCount,
127  leafCount,
128  rowCount,
129  reinterpret_cast<uint8_t*>(shardDataBlock.numbersPtr));
130  case 2:
132  shardCount,
133  leafCount,
134  rowCount,
135  reinterpret_cast<uint16_t*>(shardDataBlock.numbersPtr));
136  case 4:
138  shardCount,
139  leafCount,
140  rowCount,
141  reinterpret_cast<uint32_t*>(shardDataBlock.numbersPtr));
142  case 8:
144  shardCount,
145  leafCount,
146  rowCount,
147  reinterpret_cast<uint64_t*>(shardDataBlock.numbersPtr));
148  }
149  throw std::runtime_error("Unexpected data block element size");
150 }
151 
152 template <typename T>
153 void copyColumnDataOfShard(const std::vector<size_t>& rowIndices, T* src, T* dst) {
154  for (size_t row = 0; row < rowIndices.size(); row++) {
155  auto srcRowIndex = rowIndices[row];
156  dst[row] = src[srcRowIndex];
157  }
158 }
159 
161  int columnId;
163 };
164 
166  ShardDataOwner& dataOwner,
167  const std::vector<size_t>& rowIndices,
168  const ColumnDescriptor* pCol,
169  size_t columnIndex,
170  DataBlockPtr dataBlock) {
171  DataBlockPtr ret;
172  if (isStringVectorData(pCol)) {
173  auto& data = dataOwner.stringData[columnIndex];
174  data.resize(rowIndices.size());
175  copyColumnDataOfShard(rowIndices, &(*(dataBlock.stringsPtr))[0], &data[0]);
176  ret.stringsPtr = &data;
177 
178  } else if (isDatumVectorData(pCol)) {
179  auto& data = dataOwner.arrayData[columnIndex];
180  data.resize(rowIndices.size());
181  copyColumnDataOfShard(rowIndices, &(*(dataBlock.arraysPtr))[0], &data[0]);
182  ret.arraysPtr = &data;
183 
184  } else {
185  auto rawArrayElementSize = sizeOfRawColumn(cat, pCol);
186  auto& data = dataOwner.rawData[columnIndex];
187  data.resize(rowIndices.size() * rawArrayElementSize);
188 
189  switch (rawArrayElementSize) {
190  case 1: {
191  copyColumnDataOfShard(rowIndices,
192  reinterpret_cast<uint8_t*>(dataBlock.numbersPtr),
193  reinterpret_cast<uint8_t*>(&data[0]));
194  break;
195  }
196  case 2: {
197  copyColumnDataOfShard(rowIndices,
198  reinterpret_cast<uint16_t*>(dataBlock.numbersPtr),
199  reinterpret_cast<uint16_t*>(&data[0]));
200  break;
201  }
202  case 4: {
203  copyColumnDataOfShard(rowIndices,
204  reinterpret_cast<uint32_t*>(dataBlock.numbersPtr),
205  reinterpret_cast<uint32_t*>(&data[0]));
206  break;
207  }
208  case 8: {
209  copyColumnDataOfShard(rowIndices,
210  reinterpret_cast<uint64_t*>(dataBlock.numbersPtr),
211  reinterpret_cast<uint64_t*>(&data[0]));
212  break;
213  }
214  default:
215  throw std::runtime_error("Unexpected data block element size");
216  }
217 
218  ret.numbersPtr = reinterpret_cast<int8_t*>(&data[0]);
219  }
220 
221  return {pCol->columnId, ret};
222 }
223 
225  ShardDataOwner& dataOwner,
226  InsertData& insert_data,
227  int shardTableIndex,
228  const std::vector<size_t>& rowIndices) {
229  const auto* td = cat.getMetadataForTable(insert_data.tableId);
230  const auto* ptd = cat.getPhysicalTablesDescriptors(td)[shardTableIndex];
231 
232  InsertData shardData;
233  shardData.databaseId = insert_data.databaseId;
234  shardData.tableId = ptd->tableId;
235  shardData.numRows = rowIndices.size();
236 
237  std::vector<const ColumnDescriptor*> pCols;
238  std::vector<int> lCols;
239 
240  {
241  auto logicalColumns = cat.getAllColumnMetadataForTable(td->tableId, true, true, true);
242  for (const auto& cd : logicalColumns) {
243  lCols.push_back(cd->columnId);
244  }
245 
246  auto physicalColumns =
247  cat.getAllColumnMetadataForTable(ptd->tableId, true, true, true);
248  for (const auto& cd : physicalColumns) {
249  pCols.push_back(cd);
250  }
251  }
252 
253  for (size_t col = 0; col < insert_data.columnIds.size(); col++) {
254  dataOwner.arrayData.emplace_back();
255  dataOwner.rawData.emplace_back();
256  dataOwner.stringData.emplace_back();
257  }
258 
259  auto copycat = [&cat, &dataOwner, &rowIndices, &lCols, &pCols, &insert_data](int col) {
260  const auto lColId = insert_data.columnIds[col];
261  const auto pCol = pCols[indexOf(lCols, lColId)];
262  return copyColumnDataOfShard(
263  cat, dataOwner, rowIndices, pCol, col, insert_data.data[col]);
264  };
265 
266  std::vector<std::future<BlockWithColumnId>> worker_threads;
267  for (size_t col = 0; col < insert_data.columnIds.size(); col++) {
268  worker_threads.push_back(std::async(std::launch::async, copycat, col));
269  }
270 
271  for (auto& child : worker_threads) {
272  child.wait();
273  }
274 
275  for (auto& child : worker_threads) {
276  auto shardColumnData = child.get();
277  shardData.columnIds.push_back(shardColumnData.columnId);
278  shardData.data.push_back(shardColumnData.block);
279  }
280 
281  return shardData;
282 }
283 
285  InsertData& insert_data) {
286  const auto& cat = session_info.getCatalog();
287  const auto* td = cat.getMetadataForTable(insert_data.tableId);
288 
289  CHECK(td);
290  if (td->nShards == 0) {
291  connector_.insertDataToLeaf(session_info, current_leaf_index_, insert_data);
292  } else {
293  // we have a sharded target table, start spreading to physical tables
294  auto rowIndicesOfShards =
295  computeRowIndicesOfShards(cat, connector_.leafCount(), insert_data);
296 
297  auto insertShardData =
298  [this, &session_info, &insert_data, &cat, &td, &rowIndicesOfShards](
299  size_t shardId) {
300  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
301  auto stardTableIdx = shardId % td->nShards;
302  auto shardLeafIdx = shardId / td->nShards;
303 
304  const auto& rowIndicesOfShard = rowIndicesOfShards[shardId];
305  ShardDataOwner shardDataOwner;
306 
307  InsertData shardData = copyDataOfShard(
308  cat, shardDataOwner, insert_data, stardTableIdx, rowIndicesOfShard);
309  connector_.insertDataToLeaf(session_info, shardLeafIdx, shardData);
310  };
311 
312  std::vector<std::future<void>> worker_threads;
313  for (size_t shardId = 0; shardId < rowIndicesOfShards.size(); shardId++) {
314  if (rowIndicesOfShards[shardId].size() > 0) {
315  worker_threads.push_back(
316  std::async(std::launch::async, insertShardData, shardId));
317  }
318  }
319  for (auto& child : worker_threads) {
320  child.wait();
321  }
322  for (auto& child : worker_threads) {
323  child.get();
324  }
325  }
326 
327  moveToNextLeaf();
328 }
329 
330 } // namespace Fragmenter_Namespace
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:334
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:141
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:142
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logicalTableDesc) const
Definition: Catalog.cpp:2897
int64_t * src
HOST DEVICE int get_size() const
Definition: sqltypes.h:336
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:2881
bool isStringVectorData(const ColumnDescriptor *cd)
std::string get_type_name() const
Definition: sqltypes.h:429
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
CHECK(cgen_state)
bool is_array() const
Definition: sqltypes.h:485
int get_logical_size() const
Definition: sqltypes.h:337
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:326
virtual void insertDataToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, Fragmenter_Namespace::InsertData &insert_data)=0
std::vector< std::vector< uint8_t > > rawData
specifies the content in-memory of a row in the column metadata table
bool is_geometry() const
Definition: sqltypes.h:489
size_t indexOf(std::vector< T > &vec, T val)
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1581
Definition: sqltypes.h:55
Definition: sqltypes.h:56
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
Catalog & getCatalog() const
Definition: SessionInfo.h:90
void copyColumnDataOfShard(const std::vector< size_t > &rowIndices, T *src, T *dst)
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::vector< std::vector< ArrayDatum > > arrayData
Definition: sqltypes.h:44
bool isDatumVectorData(const ColumnDescriptor *cd)
bool is_string() const
Definition: sqltypes.h:477
std::vector< std::vector< std::string > > stringData
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
std::vector< std::vector< size_t > > computeRowIndicesOfShards(size_t shardCount, size_t leafCount, size_t rowCount, SRC *src)
size_t sizeOfRawColumn(const Catalog_Namespace::Catalog &cat, const ColumnDescriptor *cd)
Definition: sqltypes.h:48
SQLTypeInfo columnType
InsertData copyDataOfShard(const Catalog_Namespace::Catalog &cat, ShardDataOwner &dataOwner, InsertData &insert_data, int shardTableIndex, const std::vector< size_t > &rowIndices)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
int8_t * numbersPtr
Definition: sqltypes.h:140
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20