OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InsertOrderFragmenter.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #pragma once
24 
25 #include <map>
26 #include <mutex>
27 #include <unordered_map>
28 #include <unordered_set>
29 #include <vector>
30 
31 #include "DataMgr/Chunk/Chunk.h"
32 #include "DataMgr/MemoryLevel.h"
33 #include "FragmentDefaultValues.h"
37 #include "Shared/types.h"
38 
39 class Executor;
40 
41 namespace Data_Namespace {
42 class DataMgr;
43 }
44 
45 namespace Fragmenter_Namespace {
46 
55  public:
57 
59  const std::vector<int> chunkKeyPrefix,
60  std::vector<Chunk_NS::Chunk>& chunkVec,
61  Data_Namespace::DataMgr* dataMgr,
63  const int physicalTableId,
64  const int shard,
65  const size_t maxFragmentRows = DEFAULT_FRAGMENT_ROWS,
66  const size_t maxChunkSize = DEFAULT_MAX_CHUNK_SIZE,
67  const size_t pageSize = DEFAULT_PAGE_SIZE /*default 1MB*/,
68  const size_t maxRows = DEFAULT_MAX_ROWS,
70  const bool uses_foreign_storage = false);
71 
72  ~InsertOrderFragmenter() override;
73 
78  size_t getNumFragments() override;
79 
86  // virtual void getFragmentsForQuery(QueryInfo &queryInfo, const void *predicate = 0);
88 
96  void insertData(InsertData& insert_data_struct) override;
97 
98  void insertChunks(const InsertChunks& insert_chunk) override;
99 
100  void insertDataNoCheckpoint(InsertData& insert_data_struct) override;
101 
102  void insertChunksNoCheckpoint(const InsertChunks& insert_chunk) override;
103 
104  void dropFragmentsToSize(const size_t maxRows) override;
105 
107  const int fragment_id,
108  const std::shared_ptr<ChunkMetadata> metadata) override;
109 
110  void updateChunkStats(const ColumnDescriptor* cd,
111  std::unordered_map</*fragment_id*/ int, ChunkStats>& stats_map,
112  std::optional<Data_Namespace::MemoryLevel> memory_level) override;
113 
114  FragmentInfo* getFragmentInfo(const int fragment_id) const override;
115 
119  inline int getFragmenterId() override { return chunkKeyPrefix_.back(); }
120  inline std::vector<int> getChunkKeyPrefix() const { return chunkKeyPrefix_; }
124  inline std::string getFragmenterType() override { return fragmenterType_; }
125  size_t getNumRows() override { return numTuples_; }
126  void setNumRows(const size_t numTuples) override { numTuples_ = numTuples; }
127 
128  std::optional<ChunkUpdateStats> updateColumn(
129  const Catalog_Namespace::Catalog* catalog,
130  const TableDescriptor* td,
131  const ColumnDescriptor* cd,
132  const int fragment_id,
133  const std::vector<uint64_t>& frag_offsets,
134  const std::vector<ScalarTargetValue>& rhs_values,
135  const SQLTypeInfo& rhs_type,
136  const Data_Namespace::MemoryLevel memory_level,
137  UpdelRoll& updel_roll) override;
138 
139  void updateColumns(const Catalog_Namespace::Catalog* catalog,
140  const TableDescriptor* td,
141  const int fragmentId,
142  const std::vector<TargetMetaInfo> sourceMetaInfo,
143  const std::vector<const ColumnDescriptor*> columnDescriptors,
144  const RowDataProvider& sourceDataProvider,
145  const size_t indexOffFragmentOffsetColumn,
146  const Data_Namespace::MemoryLevel memoryLevel,
147  UpdelRoll& updelRoll,
148  Executor* executor) override;
149 
150  void updateColumn(const Catalog_Namespace::Catalog* catalog,
151  const TableDescriptor* td,
152  const ColumnDescriptor* cd,
153  const int fragment_id,
154  const std::vector<uint64_t>& frag_offsets,
155  const ScalarTargetValue& rhs_value,
156  const SQLTypeInfo& rhs_type,
157  const Data_Namespace::MemoryLevel memory_level,
158  UpdelRoll& updel_roll) override;
159 
160  void updateColumnMetadata(const ColumnDescriptor* cd,
161  FragmentInfo& fragment,
162  std::shared_ptr<Chunk_NS::Chunk> chunk,
163  const UpdateValuesStats& update_values_stats,
164  const SQLTypeInfo& rhs_type,
165  UpdelRoll& updel_roll) override;
166 
167  void updateMetadata(const Catalog_Namespace::Catalog* catalog,
168  const MetaDataKey& key,
169  UpdelRoll& updel_roll) override;
170 
171  void compactRows(const Catalog_Namespace::Catalog* catalog,
172  const TableDescriptor* td,
173  const int fragment_id,
174  const std::vector<uint64_t>& frag_offsets,
175  const Data_Namespace::MemoryLevel memory_level,
176  UpdelRoll& updel_roll) override;
177 
178  const std::vector<uint64_t> getVacuumOffsets(
179  const std::shared_ptr<Chunk_NS::Chunk>& chunk) override;
180 
182  const FragmentInfo& fragment,
183  const Data_Namespace::MemoryLevel memory_level);
184 
185  void dropColumns(const std::vector<int>& columnIds) override;
186 
187  bool hasDeletedRows(const int delete_column_id) override;
188 
189  void resetSizesFromFragments() override;
190 
191  protected:
192  std::vector<int> chunkKeyPrefix_;
193  std::map<int, Chunk_NS::Chunk>
195  std::vector<std::unique_ptr<Chunk_NS::Chunk>> tracked_in_memory_chunks_;
196  std::deque<std::unique_ptr<FragmentInfo>>
198  // int currentInsertBufferFragmentId_;
201  const int physicalTableId_;
202  const int shard_;
204  size_t pageSize_; /* Page size in bytes of each page making up a given chunk - passed to
205  BufferMgr in createChunk() */
206  size_t numTuples_;
209  size_t maxRows_;
210  std::string fragmenterType_;
212  fragmentInfoMutex_; // to prevent read-write conflicts for fragmentInfoVec_
214  insertMutex_; // to prevent race conditions on insert - only one insert statement
215  // should be going to a table at a time
220  std::unordered_map<int, size_t> varLenColInfo_;
221  std::shared_ptr<std::mutex> mutex_access_inmem_states;
222 
233  void deleteFragments(const std::vector<int>& dropFragIds);
234 
236  void getChunkMetadata();
237 
238  void lockInsertCheckpointData(const InsertData& insertDataStruct);
239  void insertDataImpl(InsertData& insert_data);
240  void insertChunksImpl(const InsertChunks& insert_chunk);
241  void addColumns(const InsertData& insertDataStruct);
242 
245  // FIX-ME: Temporary lock; needs removing.
246  mutable std::mutex temp_mutex_;
247 
248  FragmentInfo& getFragmentInfoFromId(const int fragment_id);
249 
250  auto vacuum_fixlen_rows(const FragmentInfo& fragment,
251  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
252  const std::vector<uint64_t>& frag_offsets);
253  auto vacuum_varlen_rows(const FragmentInfo& fragment,
254  const std::shared_ptr<Chunk_NS::Chunk>& chunk,
255  const std::vector<uint64_t>& frag_offsets);
256 
257  private:
258  bool isAddingNewColumns(const InsertData& insert_data) const;
259  void dropFragmentsToSizeNoInsertLock(const size_t max_rows);
261  void insertChunksIntoFragment(const InsertChunks& insert_chunks,
262  const std::optional<int> delete_column_id,
263  FragmentInfo* current_fragment,
264  const size_t num_rows_to_insert,
265  size_t& num_rows_inserted,
266  size_t& num_rows_left,
267  std::vector<size_t>& valid_row_indices,
268  const size_t start_fragment);
269 };
270 
271 } // namespace Fragmenter_Namespace
void insertChunksIntoFragment(const InsertChunks &insert_chunks, const std::optional< int > delete_column_id, FragmentInfo *current_fragment, const size_t num_rows_to_insert, size_t &num_rows_inserted, size_t &num_rows_left, std::vector< size_t > &valid_row_indices, const size_t start_fragment)
void dropFragmentsToSizeNoInsertLock(const size_t max_rows)
void updateMetadata(const Catalog_Namespace::Catalog *catalog, const MetaDataKey &key, UpdelRoll &updel_roll) override
void insertChunksImpl(const InsertChunks &insert_chunk)
void dropColumns(const std::vector< int > &columnIds) override
FragmentInfo & getFragmentInfoFromId(const int fragment_id)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
TableInfo getFragmentsForQuery() override
returns (inside QueryInfo) object all ids and row sizes of fragments
InsertOrderFragmenter & operator=(const InsertOrderFragmenter &)
std::pair< const TableDescriptor *, Fragmenter_Namespace::FragmentInfo * > MetaDataKey
Definition: UpdelRoll.h:41
std::deque< std::unique_ptr< FragmentInfo > > fragmentInfoVec_
void dropFragmentsToSize(const size_t maxRows) override
Will truncate table to less than maxRows by dropping fragments.
InsertOrderFragmenter(const std::vector< int > chunkKeyPrefix, std::vector< Chunk_NS::Chunk > &chunkVec, Data_Namespace::DataMgr *dataMgr, Catalog_Namespace::Catalog *catalog, const int physicalTableId, const int shard, const size_t maxFragmentRows=DEFAULT_FRAGMENT_ROWS, const size_t maxChunkSize=DEFAULT_MAX_CHUNK_SIZE, const size_t pageSize=DEFAULT_PAGE_SIZE, const size_t maxRows=DEFAULT_MAX_ROWS, const Data_Namespace::MemoryLevel defaultInsertLevel=Data_Namespace::DISK_LEVEL, const bool uses_foreign_storage=false)
std::shared_ptr< std::mutex > mutex_access_inmem_states
#define DEFAULT_MAX_CHUNK_SIZE
std::optional< ChunkUpdateStats > updateColumn(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const ColumnDescriptor *cd, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const std::vector< ScalarTargetValue > &rhs_values, const SQLTypeInfo &rhs_type, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
void updateColumns(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragmentId, const std::vector< TargetMetaInfo > sourceMetaInfo, const std::vector< const ColumnDescriptor * > columnDescriptors, const RowDataProvider &sourceDataProvider, const size_t indexOffFragmentOffsetColumn, const Data_Namespace::MemoryLevel memoryLevel, UpdelRoll &updelRoll, Executor *executor) override
High-level representation of SQL values.
The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order...
auto vacuum_fixlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
void insertData(InsertData &insert_data_struct) override
appends data onto the most recently occuring fragment, creating a new one if necessary ...
void updateColumnChunkMetadata(const ColumnDescriptor *cd, const int fragment_id, const std::shared_ptr< ChunkMetadata > metadata) override
Updates the metadata for a column chunk.
void insertChunks(const InsertChunks &insert_chunk) override
Insert chunks into minimal number of fragments.
#define DEFAULT_MAX_ROWS
std::string getFragmenterType() override
get fragmenter&#39;s type (as string
const std::vector< uint64_t > getVacuumOffsets(const std::shared_ptr< Chunk_NS::Chunk > &chunk) override
void addColumns(const InsertData &insertDataStruct)
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
void deleteFragments(const std::vector< int > &dropFragIds)
void setNumRows(const size_t numTuples) override
specifies the content in-memory of a row in the column metadata table
FragmentInfo * createNewFragment(const Data_Namespace::MemoryLevel memory_level=Data_Namespace::DISK_LEVEL)
creates new fragment, calling createChunk() method of BufferMgr to make a new chunk for each column o...
#define DEFAULT_PAGE_SIZE
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:244
void lockInsertCheckpointData(const InsertData &insertDataStruct)
void insertChunksNoCheckpoint(const InsertChunks &insert_chunk) override
Insert chunks into minimal number of fragments; no locks or checkpoints taken.
void compactRows(const Catalog_Namespace::Catalog *catalog, const TableDescriptor *td, const int fragment_id, const std::vector< uint64_t > &frag_offsets, const Data_Namespace::MemoryLevel memory_level, UpdelRoll &updel_roll) override
FragmentInfo * getFragmentInfo(const int fragment_id) const override
Retrieve the fragment info object for an individual fragment for editing.
#define DEFAULT_FRAGMENT_ROWS
std::vector< std::unique_ptr< Chunk_NS::Chunk > > tracked_in_memory_chunks_
void updateChunkStats(const ColumnDescriptor *cd, std::unordered_map< int, ChunkStats > &stats_map, std::optional< Data_Namespace::MemoryLevel > memory_level) override
Update chunk stats.
auto getChunksForAllColumns(const TableDescriptor *td, const FragmentInfo &fragment, const Data_Namespace::MemoryLevel memory_level)
bool isAddingNewColumns(const InsertData &insert_data) const
std::unordered_map< int, size_t > varLenColInfo_
size_t getNumFragments() override
returns the number of fragments in a table
void updateColumnMetadata(const ColumnDescriptor *cd, FragmentInfo &fragment, std::shared_ptr< Chunk_NS::Chunk > chunk, const UpdateValuesStats &update_values_stats, const SQLTypeInfo &rhs_type, UpdelRoll &updel_roll) override
The data to be inserted using the fragment manager.
Definition: Fragmenter.h:68
void insertDataNoCheckpoint(InsertData &insert_data_struct) override
Given data wrapped in an InsertData struct, inserts it into the correct partitions No locks and check...
std::shared_timed_mutex shared_mutex
int getFragmenterId() override
get fragmenter&#39;s id
std::map< int, Chunk_NS::Chunk > columnMap_
auto vacuum_varlen_rows(const FragmentInfo &fragment, const std::shared_ptr< Chunk_NS::Chunk > &chunk, const std::vector< uint64_t > &frag_offsets)
bool hasDeletedRows(const int delete_column_id) override
Iterates through chunk metadata to return whether any rows have been deleted.
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180