OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
InsertDataLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019, OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <numeric>
19 #include <vector>
20 
21 #include "../Shared/shard_key.h"
22 #include "Geospatial/Types.h"
23 #include "InsertDataLoader.h"
25 
26 namespace Fragmenter_Namespace {
27 
29  std::vector<std::vector<uint8_t>> rawData;
30  std::vector<std::vector<std::string>> stringData;
31  std::vector<std::vector<ArrayDatum>> arrayData;
32 };
33 
34 template <typename SRC>
35 std::vector<std::vector<size_t>> computeRowIndicesOfShards(size_t shard_count,
36  size_t leaf_count,
37  size_t row_count,
38  SRC* src,
39  bool duplicated_key_value) {
40  const auto n_shard_tables = shard_count * leaf_count;
41  std::vector<std::vector<size_t>> row_indices_of_shards(n_shard_tables);
42  if (!duplicated_key_value) {
43  for (size_t row = 0; row < row_count; row++) {
44  // expecting unsigned data
45  // thus, no need for double remainder
46  auto shard_id = (std::is_unsigned<SRC>::value)
47  ? src[row] % n_shard_tables
48  : SHARD_FOR_KEY(src[row], n_shard_tables);
49  row_indices_of_shards[shard_id].push_back(row);
50  }
51  } else {
52  auto shard_id = (std::is_unsigned<SRC>::value)
53  ? src[0] % n_shard_tables
54  : SHARD_FOR_KEY(src[0], n_shard_tables);
55  row_indices_of_shards[shard_id].reserve(row_count);
56  for (size_t row = 0; row < row_count; row++) {
57  row_indices_of_shards[shard_id].push_back(row);
58  }
59  }
60 
61  return row_indices_of_shards;
62 }
63 
64 template <typename T>
65 size_t indexOf(std::vector<T>& vec, T val) {
66  typename std::vector<T>::iterator it = std::find(vec.begin(), vec.end(), val);
67  CHECK(it != vec.end());
68  return std::distance(vec.begin(), it);
69 }
70 
72  return (cd->columnType.is_geometry()) ||
73  (cd->columnType.is_string() &&
75 }
76 
78  return cd->columnType.is_array();
79 }
80 
82  const ColumnDescriptor* cd) {
83  switch (cd->columnType.get_type()) {
84  case kPOINT:
85  case kLINESTRING:
86  case kPOLYGON:
87  case kMULTIPOLYGON:
88  case kARRAY:
89  throw std::runtime_error("geo and array columns have variable length elements");
90  case kBOOLEAN:
91  case kTINYINT:
92  case kSMALLINT:
93  case kINT:
94  case kBIGINT:
95  case kNUMERIC:
96  case kDECIMAL:
97  case kFLOAT:
98  case kDOUBLE:
99  case kTIMESTAMP:
100  case kTIME:
101  case kINTERVAL_DAY_TIME:
103  case kDATE:
104  return cd->columnType.get_logical_size();
105  case kTEXT:
106  case kVARCHAR:
107  case kCHAR:
109  throw std::runtime_error(
110  "non encoded string columns have variable length elements");
111  }
112  return cd->columnType.get_size();
113  default:
114  throw std::runtime_error("not supported column type: " + cd->columnName + " (" +
115  cd->columnType.get_type_name() + ")");
116  }
117 }
118 
119 std::vector<std::vector<size_t>> computeRowIndicesOfShards(
121  size_t leafCount,
122  InsertData& insert_data) {
123  const auto* td = cat.getMetadataForTable(insert_data.tableId);
124  const auto* shard_cd = cat.getShardColumnMetadataForTable(td);
125  auto shardDataBlockIndex = indexOf(insert_data.columnIds, shard_cd->columnId);
126  DataBlockPtr& shardDataBlock = insert_data.data[shardDataBlockIndex];
127  auto rowCount = insert_data.numRows;
128  auto shardCount = td->nShards;
129 
130  CHECK(!isStringVectorData(shard_cd));
131  CHECK(!isDatumVectorData(shard_cd));
132 
133  CHECK(insert_data.is_default.size() == insert_data.columnIds.size());
134  bool is_default = insert_data.is_default[shardDataBlockIndex];
135  switch (sizeOfRawColumn(cat, shard_cd)) {
136  case 1:
138  shardCount,
139  leafCount,
140  rowCount,
141  reinterpret_cast<uint8_t*>(shardDataBlock.numbersPtr),
142  is_default);
143  case 2:
145  shardCount,
146  leafCount,
147  rowCount,
148  reinterpret_cast<uint16_t*>(shardDataBlock.numbersPtr),
149  is_default);
150  case 4:
152  shardCount,
153  leafCount,
154  rowCount,
155  reinterpret_cast<uint32_t*>(shardDataBlock.numbersPtr),
156  is_default);
157  case 8:
159  shardCount,
160  leafCount,
161  rowCount,
162  reinterpret_cast<uint64_t*>(shardDataBlock.numbersPtr),
163  is_default);
164  }
165  throw std::runtime_error("Unexpected data block element size");
166 }
167 
168 template <typename T>
169 void copyColumnDataOfShard(const std::vector<size_t>& rowIndices, T* src, T* dst) {
170  for (size_t row = 0; row < rowIndices.size(); row++) {
171  auto srcRowIndex = rowIndices[row];
172  dst[row] = src[srcRowIndex];
173  }
174 }
175 
177  int columnId;
180 };
181 
183  ShardDataOwner& dataOwner,
184  const std::vector<size_t>& rowIndices,
185  const ColumnDescriptor* pCol,
186  size_t columnIndex,
187  DataBlockPtr dataBlock,
188  bool is_default) {
189  DataBlockPtr ret;
190  std::vector<size_t> single_row_idx({0ul});
191  const std::vector<size_t>& rows = is_default ? single_row_idx : rowIndices;
192  if (isStringVectorData(pCol)) {
193  auto& data = dataOwner.stringData[columnIndex];
194  data.resize(rows.size());
195  copyColumnDataOfShard(rows, &(*(dataBlock.stringsPtr))[0], &data[0]);
196  ret.stringsPtr = &data;
197 
198  } else if (isDatumVectorData(pCol)) {
199  auto& data = dataOwner.arrayData[columnIndex];
200  data.resize(rows.size());
201  copyColumnDataOfShard(rows, &(*(dataBlock.arraysPtr))[0], &data[0]);
202  ret.arraysPtr = &data;
203 
204  } else {
205  auto rawArrayElementSize = sizeOfRawColumn(cat, pCol);
206  auto& data = dataOwner.rawData[columnIndex];
207  data.resize(rows.size() * rawArrayElementSize);
208 
209  switch (rawArrayElementSize) {
210  case 1: {
212  reinterpret_cast<uint8_t*>(dataBlock.numbersPtr),
213  reinterpret_cast<uint8_t*>(&data[0]));
214  break;
215  }
216  case 2: {
218  reinterpret_cast<uint16_t*>(dataBlock.numbersPtr),
219  reinterpret_cast<uint16_t*>(&data[0]));
220  break;
221  }
222  case 4: {
224  reinterpret_cast<uint32_t*>(dataBlock.numbersPtr),
225  reinterpret_cast<uint32_t*>(&data[0]));
226  break;
227  }
228  case 8: {
230  reinterpret_cast<uint64_t*>(dataBlock.numbersPtr),
231  reinterpret_cast<uint64_t*>(&data[0]));
232  break;
233  }
234  default:
235  throw std::runtime_error("Unexpected data block element size");
236  }
237 
238  ret.numbersPtr = reinterpret_cast<int8_t*>(&data[0]);
239  }
240 
241  return {pCol->columnId, ret, is_default};
242 }
243 
245  ShardDataOwner& dataOwner,
246  InsertData& insert_data,
247  int shardTableIndex,
248  const std::vector<size_t>& rowIndices) {
249  const auto* td = cat.getMetadataForTable(insert_data.tableId);
250  const auto* ptd = cat.getPhysicalTablesDescriptors(td)[shardTableIndex];
251 
252  InsertData shardData;
253  shardData.databaseId = insert_data.databaseId;
254  shardData.tableId = ptd->tableId;
255  shardData.numRows = rowIndices.size();
256 
257  std::vector<const ColumnDescriptor*> pCols;
258  std::vector<int> lCols;
259 
260  {
261  auto logicalColumns = cat.getAllColumnMetadataForTable(td->tableId, true, true, true);
262  for (const auto& cd : logicalColumns) {
263  lCols.push_back(cd->columnId);
264  }
265 
266  auto physicalColumns =
267  cat.getAllColumnMetadataForTable(ptd->tableId, true, true, true);
268  for (const auto& cd : physicalColumns) {
269  pCols.push_back(cd);
270  }
271  }
272 
273  for (size_t col = 0; col < insert_data.columnIds.size(); col++) {
274  dataOwner.arrayData.emplace_back();
275  dataOwner.rawData.emplace_back();
276  dataOwner.stringData.emplace_back();
277  }
278 
279  auto copycat = [&cat, &dataOwner, &rowIndices, &lCols, &pCols, &insert_data](int col) {
280  const auto lColId = insert_data.columnIds[col];
281  const auto pCol = pCols[indexOf(lCols, lColId)];
282  return copyColumnDataOfShard(cat,
283  dataOwner,
284  rowIndices,
285  pCol,
286  col,
287  insert_data.data[col],
288  insert_data.is_default[col]);
289  };
290 
291  std::vector<std::future<BlockWithColumnId>> worker_threads;
292  for (size_t col = 0; col < insert_data.columnIds.size(); col++) {
293  worker_threads.push_back(std::async(std::launch::async, copycat, col));
294  }
295 
296  for (auto& child : worker_threads) {
297  child.wait();
298  }
299 
300  for (auto& child : worker_threads) {
301  auto shardColumnData = child.get();
302  shardData.columnIds.push_back(shardColumnData.columnId);
303  shardData.data.push_back(shardColumnData.block);
304  shardData.is_default.push_back(shardColumnData.is_default);
305  }
306 
307  return shardData;
308 }
309 
311  InsertData& insert_data) {
312  const auto& cat = session_info.getCatalog();
313  const auto* td = cat.getMetadataForTable(insert_data.tableId);
314 
315  CHECK(td);
316  if (td->nShards == 0) {
317  connector_.insertDataToLeaf(session_info, current_leaf_index_, insert_data);
318  } else {
319  // we have a sharded target table, start spreading to physical tables
320  auto rowIndicesOfShards =
322 
323  auto insertShardData =
324  [this, &session_info, &insert_data, &cat, &td, &rowIndicesOfShards](
325  size_t shardId) {
326  const auto shard_tables = cat.getPhysicalTablesDescriptors(td);
327  auto stardTableIdx = shardId % td->nShards;
328  auto shardLeafIdx = shardId / td->nShards;
329 
330  const auto& rowIndicesOfShard = rowIndicesOfShards[shardId];
331  ShardDataOwner shardDataOwner;
332 
333  InsertData shardData = copyDataOfShard(
334  cat, shardDataOwner, insert_data, stardTableIdx, rowIndicesOfShard);
335  connector_.insertDataToLeaf(session_info, shardLeafIdx, shardData);
336  };
337 
338  std::vector<std::future<void>> worker_threads;
339  for (size_t shardId = 0; shardId < rowIndicesOfShards.size(); shardId++) {
340  if (rowIndicesOfShards[shardId].size() > 0) {
341  worker_threads.push_back(
342  std::async(std::launch::async, insertShardData, shardId));
343  }
344  }
345  for (auto& child : worker_threads) {
346  child.wait();
347  }
348  for (auto& child : worker_threads) {
349  child.get();
350  }
351  }
352 
353  moveToNextLeaf();
354 }
355 } // namespace Fragmenter_Namespace
std::vector< std::vector< size_t > > computeRowIndicesOfShards(size_t shard_count, size_t leaf_count, size_t row_count, SRC *src, bool duplicated_key_value)
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:102
Definition: sqltypes.h:48
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:221
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:222
std::vector< bool > is_default
Definition: Fragmenter.h:66
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
const ColumnDescriptor * getShardColumnMetadataForTable(const TableDescriptor *td) const
Definition: Catalog.cpp:4045
bool isStringVectorData(const ColumnDescriptor *cd)
int tableId
identifies the database into which the data is being inserted
Definition: Fragmenter.h:61
size_t numRows
a vector of column ids for the row(s) being inserted
Definition: Fragmenter.h:63
int get_logical_size() const
Definition: sqltypes.h:325
virtual void insertDataToLeaf(const Catalog_Namespace::SessionInfo &parent_session_info, const size_t leaf_idx, Fragmenter_Namespace::InsertData &insert_data)=0
std::vector< std::vector< uint8_t > > rawData
specifies the content in-memory of a row in the column metadata table
std::vector< const TableDescriptor * > getPhysicalTablesDescriptors(const TableDescriptor *logical_table_desc, bool populate_fragmenter=true) const
Definition: Catalog.cpp:4063
size_t indexOf(std::vector< T > &vec, T val)
Definition: sqltypes.h:51
Definition: sqltypes.h:52
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
std::vector< DataBlockPtr > data
the number of rows being inserted
Definition: Fragmenter.h:64
Catalog & getCatalog() const
Definition: SessionInfo.h:66
std::list< const ColumnDescriptor * > getAllColumnMetadataForTable(const int tableId, const bool fetchSystemColumns, const bool fetchVirtualColumns, const bool fetchPhysicalColumns) const
Returns a list of pointers to constant ColumnDescriptor structs for all the columns from a particular...
Definition: Catalog.cpp:1771
void copyColumnDataOfShard(const std::vector< size_t > &rowIndices, T *src, T *dst)
void insertData(const Catalog_Namespace::SessionInfo &session_info, InsertData &insert_data)
std::vector< std::vector< ArrayDatum > > arrayData
std::string get_type_name() const
Definition: sqltypes.h:417
Definition: sqltypes.h:40
bool isDatumVectorData(const ColumnDescriptor *cd)
std::vector< std::vector< std::string > > stringData
#define CHECK(condition)
Definition: Logger.h:203
bool is_geometry() const
Definition: sqltypes.h:501
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:59
size_t sizeOfRawColumn(const Catalog_Namespace::Catalog &cat, const ColumnDescriptor *cd)
Definition: sqltypes.h:44
SQLTypeInfo columnType
InsertData copyDataOfShard(const Catalog_Namespace::Catalog &cat, ShardDataOwner &dataOwner, InsertData &insert_data, int shardTableIndex, const std::vector< size_t > &rowIndices)
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
bool is_string() const
Definition: sqltypes.h:489
int8_t * numbersPtr
Definition: sqltypes.h:220
std::vector< int > columnIds
identifies the table into which the data is being inserted
Definition: Fragmenter.h:62
std::string columnName
#define SHARD_FOR_KEY(key, num_shards)
Definition: shard_key.h:20
bool is_array() const
Definition: sqltypes.h:497