OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageMgr.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <shared_mutex>
20 
22 #include "ForeignDataWrapper.h"
24 
25 using namespace Data_Namespace;
26 
27 namespace Catalog_Namespace {
28 class Catalog;
29 }
30 
31 class PostEvictionRefreshException : public std::runtime_error {
32  public:
36  PostEvictionRefreshException(const std::runtime_error& exception)
37  : std::runtime_error(""), original_exception_(exception){};
38 
39  std::runtime_error getOriginalException() { return original_exception_; }
40 
41  private:
42  std::runtime_error original_exception_;
43 };
44 
45 namespace foreign_storage {
46 bool is_append_table_chunk_key(const ChunkKey& chunk_key);
47 
49  public:
50  ChunkSizeValidator(const ChunkKey& chunk_key);
51 
52  void validateChunkSize(const AbstractBuffer* buffer) const;
53 
54  void validateChunkSizes(const ChunkToBufferMap& buffers) const;
55 
56  void throwChunkSizeViolatedError(const int64_t actual_chunk_size,
57  const int column_id = -1) const;
58 
59  private:
61  int64_t max_chunk_size_;
62  std::shared_ptr<Catalog_Namespace::Catalog> catalog_;
65 };
66 
67 bool set_comp(const ChunkKey& left, const ChunkKey& right);
68 
69 // For testing purposes only
71  public:
72  virtual void setParentWrapper(
73  std::shared_ptr<ForeignDataWrapper> parent_data_wrapper) = 0;
74 
75  virtual void unsetParentWrapper() = 0;
76 };
77 
78 class ForeignStorageMgr : public AbstractBufferMgr {
79  public:
81 
82  ~ForeignStorageMgr() override {}
83 
84  AbstractBuffer* createBuffer(const ChunkKey& chunk_key,
85  const size_t page_size,
86  const size_t initial_size) override;
87  void deleteBuffer(const ChunkKey& chunk_key, const bool purge) override;
88  void deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
89  const bool purge) override;
90  AbstractBuffer* getBuffer(const ChunkKey& chunk_key, const size_t num_bytes) override;
91  void fetchBuffer(const ChunkKey& chunk_key,
92  AbstractBuffer* destination_buffer,
93  const size_t num_bytes) override;
94  AbstractBuffer* putBuffer(const ChunkKey& chunk_key,
95  AbstractBuffer* source_buffer,
96  const size_t num_bytes) override;
97  /*
98  Obtains chunk-metadata relating to a prefix. Will create and use new
99  datawrappers if none are found for the given prefix.
100  */
101  void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector& chunk_metadata,
102  const ChunkKey& chunk_key_prefix) override;
103  bool isBufferOnDevice(const ChunkKey& chunk_key) override;
104  std::string printSlabs() override;
105  size_t getMaxSize() override;
106  size_t getInUseSize() override;
107  size_t getAllocated() override;
108  bool isAllocationCapped() override;
109  void checkpoint() override;
110  void checkpoint(const int db_id, const int tb_id) override;
111  AbstractBuffer* alloc(const size_t num_bytes) override;
112  void free(AbstractBuffer* buffer) override;
113  MgrType getMgrType() override;
114  std::string getStringMgrType() override;
115  size_t getNumChunks() override;
116  void removeTableRelatedDS(const int db_id, const int table_id) override;
117  bool hasDataWrapperForChunk(const ChunkKey& chunk_key) const;
118  virtual bool createDataWrapperIfNotExists(const ChunkKey& chunk_key);
119 
120  // For testing, is datawrapper state recovered from disk
121  bool isDatawrapperRestored(const ChunkKey& chunk_key);
122  void setDataWrapper(const ChunkKey& table_key,
123  std::shared_ptr<MockForeignDataWrapper> data_wrapper);
124  std::shared_ptr<ForeignDataWrapper> getDataWrapper(const ChunkKey& chunk_key) const;
125 
126  virtual void refreshTable(const ChunkKey& table_key, const bool evict_cached_entries);
127 
128  using ParallelismHint = std::pair<int, int>;
129  void setParallelismHints(
130  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table);
131  virtual size_t maxFetchSize(int32_t db_id) const;
132  virtual bool hasMaxFetchSize() const;
133 
134  protected:
135  virtual void eraseDataWrapper(const ChunkKey& table_key);
136  void updateFragmenterMetadata(const ChunkToBufferMap&) const;
137  void createDataWrapperUnlocked(int32_t db, int32_t tb);
138  bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey& chunk_key,
139  AbstractBuffer* destination_buffer,
140  const size_t num_bytes);
141  ChunkToBufferMap allocateTempBuffersForChunks(const std::set<ChunkKey>& chunk_keys);
142  void clearTempChunkBufferMapEntriesForTable(const ChunkKey& table_key);
143  void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey& table_key);
144 
145  std::set<ChunkKey> getOptionalChunkKeySetAndNormalizeCache(
146  const ChunkKey& chunk_key,
147  const std::set<ChunkKey>& required_chunk_keys,
148  const ForeignDataWrapper::ParallelismLevel parallelism_level);
149 
150  std::pair<std::set<ChunkKey, decltype(set_comp)*>,
151  std::set<ChunkKey, decltype(set_comp)*>>
152  getPrefetchSets(const ChunkKey& chunk_key,
153  const std::set<ChunkKey>& required_chunk_keys,
154  const ForeignDataWrapper::ParallelismLevel parallelism_level) const;
155 
156  virtual std::set<ChunkKey> getOptionalKeysWithinSizeLimit(
157  const ChunkKey& chunk_key,
158  const std::set<ChunkKey, decltype(set_comp)*>& same_fragment_keys,
159  const std::set<ChunkKey, decltype(set_comp)*>& diff_fragment_keys) const;
160 
161  virtual bool isChunkCached(const ChunkKey& chunk_key) const;
162 
163  virtual void evictChunkFromCache(const ChunkKey& chunk_key);
164 
165  static void checkIfS3NeedsToBeEnabled(const ChunkKey& chunk_key);
166 
168  std::map<ChunkKey, std::shared_ptr<ForeignDataWrapper>> data_wrapper_map_;
169 
170  // Some operations in FSM delete and re-create wrappers (refreshing a table, for
171  // instance). If we have mocked these wrappers, then we should preserve the mock and
172  // re-use it if we re-create the wrapper.
173  std::map<ChunkKey, std::shared_ptr<MockForeignDataWrapper>> mocked_wrapper_map_;
174 
175  // TODO: Remove below map, which is used to temporarily hold chunk buffers,
176  // when buffer mgr interface is updated to accept multiple buffers in one call
177  std::map<ChunkKey, std::unique_ptr<AbstractBuffer>> temp_chunk_buffer_map_;
179 
181  std::map<ChunkKey, std::set<ParallelismHint>> parallelism_hints_per_table_;
182 };
183 
184 std::vector<ChunkKey> get_column_key_vec(const ChunkKey& destination_chunk_key);
185 std::set<ChunkKey> get_column_key_set(const ChunkKey& destination_chunk_key);
186 size_t get_max_chunk_size(const ChunkKey& key);
187 bool contains_fragment_key(const std::set<ChunkKey>& key_set, const ChunkKey& target_key);
188 bool is_table_enabled_on_node(const ChunkKey& key);
189 } // namespace foreign_storage
bool set_comp(const ChunkKey &left, const ChunkKey &right)
std::vector< int > ChunkKey
Definition: types.h:36
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
bool is_append_table_chunk_key(const ChunkKey &chunk_key)
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
std::runtime_error getOriginalException()
bool is_table_enabled_on_node(const ChunkKey &key)
PostEvictionRefreshException(const std::runtime_error &exception)
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
size_t get_max_chunk_size(const ChunkKey &key)
std::runtime_error original_exception_
std::map< ChunkKey, std::shared_ptr< MockForeignDataWrapper > > mocked_wrapper_map_
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
std::shared_timed_mutex shared_mutex
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)