1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
23 #pragma once
25 #include <map>
26 #include <mutex>
27 #include <unordered_map>
28 #include <unordered_set>
29 #include <vector>
31 #include "DataMgr/Chunk/Chunk.h"
32 #include "DataMgr/MemoryLevel.h"
33 #include "FragmentDefaultValues.h"
37 #include "Shared/types.h"
39 class Executor;
41 namespace Data_Namespace {
42 class DataMgr;
43 }
45 namespace Fragmenter_Namespace {
55  public:
59  const std::vector<int> chunkKeyPrefix,
60  std::vector<Chunk_NS::Chunk>& chunkVec,
61  Data_Namespace::DataMgr* dataMgr,
63  const int physicalTableId,
64  const int shard,
65  const size_t maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
66  const size_t maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
67  const size_t pageSize = DEFAULT_PAGE_SIZE /*default 1MB*/,
68  const size_t maxRows = DEFAULT_MAX_ROWS,
70  const bool uses_foreign_storage = false);
72  ~InsertOrderFragmenter() override;
78  size_t getNumFragments() override;
86  // virtual void getFragmentsForQuery(QueryInfo &queryInfo, const void *predicate = 0);
96  void insertData(InsertData& insert_data_struct) override;
98  void insertChunks(const InsertChunks& insert_chunk) override;
100  void insertDataNoCheckpoint(InsertData& insert_data_struct) override;
102  void insertChunksNoCheckpoint(const InsertChunks& insert_chunk) override;
104  void dropFragmentsToSize(const size_t maxRows) override;
107  const int fragment_id,
108  const std::shared_ptr<ChunkMetadata> metadata) override;
110  void updateChunkStats(const ColumnDescriptor* cd,
111  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
112  std::optional<Data_Namespace::MemoryLevel> memory_level) override;
114  FragmentInfo* getFragmentInfo(const int fragment_id) const override;
119  inline int getFragmenterId() override { return chunkKeyPrefix_.back(); }
120  inline std::vector<int> getChunkKeyPrefix() const { return chunkKeyPrefix_; }
124  inline std::string getFragmenterType() override { return fragmenterType_; }
125  size_t getNumRows() override { return numTuples_; }
126  void setNumRows(const size_t numTuples) override { numTuples_ = numTuples; }
128  std::optional<ChunkUpdateStats> updateColumn(
129  const Catalog_Namespace::Catalog* catalog,
130  const TableDescriptor* td,
131  const ColumnDescriptor* cd,
132  const int fragment_id,
133  const std::vector<uint64_t>& frag_offsets,
134  const std::vector<ScalarTargetValue>& rhs_values,
135  const SQLTypeInfo& rhs_type,
136  const Data_Namespace::MemoryLevel memory_level,
137  UpdelRoll& updel_roll) override;
139  void updateColumns(const Catalog_Namespace::Catalog* catalog,
140  const TableDescriptor* td,
141  const int fragmentId,
142  const std::vector<TargetMetaInfo> sourceMetaInfo,
143  const std::vector<const ColumnDescriptor*> columnDescriptors,
144  const RowDataProvider& sourceDataProvider,
145  const size_t indexOffFragmentOffsetColumn,
146  const Data_Namespace::MemoryLevel memoryLevel,
147  UpdelRoll& updelRoll,
148  Executor* executor) override;
150  void updateColumn(const Catalog_Namespace::Catalog* catalog,
151  const TableDescriptor* td,
152  const ColumnDescriptor* cd,
153  const int fragment_id,
154  const std::vector<uint64_t>& frag_offsets,
155  const ScalarTargetValue& rhs_value,
156  const SQLTypeInfo& rhs_type,
157  const Data_Namespace::MemoryLevel memory_level,
158  UpdelRoll& updel_roll) override;
160  void updateColumnMetadata(const ColumnDescriptor* cd,
161  FragmentInfo& fragment,
162  std::shared_ptr<Chunk_NS::Chunk> chunk,
163  const UpdateValuesStats& update_values_stats,
164  const SQLTypeInfo& rhs_type,
165  UpdelRoll& updel_roll) override;
167  void updateMetadata(const Catalog_Namespace::Catalog* catalog,
168  const MetaDataKey& key,
169  UpdelRoll& updel_roll) override;
171  void compactRows(const Catalog_Namespace::Catalog* catalog,
172  const TableDescriptor* td,
173  const int fragment_id,
174  const std::vector<uint64_t>& frag_offsets,
175  const Data_Namespace::MemoryLevel memory_level,
176  UpdelRoll& updel_roll) override;
178  const std::vector<uint64_t> getVacuumOffsets(
179  const std::shared_ptr<Chunk_NS::Chunk>& chunk) override;
182  const FragmentInfo& fragment,
183  const Data_Namespace::MemoryLevel memory_level);
185  void dropColumns(const std::vector<int>& columnIds) override;
187  bool hasDeletedRows(const int delete_column_id) override;
189  void resetSizesFromFragments() override;
191  void alterNonGeoColumnType(const std::list<const ColumnDescriptor*>& columns);
193  void alterColumnGeoType(
194  const std::list<
195  std::pair<const ColumnDescriptor*, std::list<const ColumnDescriptor*>>>&
196  src_dst_column_pairs);
198  protected:
199  std::vector<int> chunkKeyPrefix_;
200  std::map<int, Chunk_NS::Chunk>
202  std::vector<std::unique_ptr<Chunk_NS::Chunk>> tracked_in_memory_chunks_;
203  std::deque<std::unique_ptr<FragmentInfo>>
205  // int currentInsertBufferFragmentId_;
208  const int physicalTableId_;
209  const int shard_;
211  size_t pageSize_; /* Page size in bytes of each page making up a given chunk - passed to
212  BufferMgr in createChunk() */
213  size_t numTuples_;
216  size_t maxRows_;
217  std::string fragmenterType_;
219  fragmentInfoMutex_; // to prevent read-write conflicts for fragmentInfoVec_
221  insertMutex_; // to prevent race conditions on insert - only one insert statement
222  // should be going to a table at a time
227  std::unordered_map<int, size_t> varLenColInfo_;
228  std::shared_ptr<std::mutex> mutex_access_inmem_states;
240  void deleteFragments(const std::vector<int>& dropFragIds);
243  void getChunkMetadata();
245  void lockInsertCheckpointData(const InsertData& insertDataStruct);
246  void insertDataImpl(InsertData& insert_data);
247  void insertChunksImpl(const InsertChunks& insert_chunk);
248  void addColumns(const InsertData& insertDataStruct);
252  // FIX-ME: Temporary lock; needs removing.
253  mutable std::mutex temp_mutex_;
255  FragmentInfo& getFragmentInfoFromId(const int fragment_id);
257  auto vacuum_fixlen_rows(const FragmentInfo& fragment,
258  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
259  const std::vector<uint64_t>& frag_offsets);
260  auto vacuum_varlen_rows(const FragmentInfo& fragment,
261  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
262  const std::vector<uint64_t>& frag_offsets);
264  private:
265  bool isAddingNewColumns(const InsertData& insert_data) const;
266  void dropFragmentsToSizeNoInsertLock(const size_t max_rows);
268  void insertChunksIntoFragment(const InsertChunks& insert_chunks,
269  const std::optional<int> delete_column_id,
270  FragmentInfo* current_fragment,
271  const size_t num_rows_to_insert,
272  size_t& num_rows_inserted,
273  size_t& num_rows_left,
274  std::vector<size_t>& valid_row_indices,
275  const size_t start_fragment);
276 };
278 } // namespace Fragmenter_Namespace
