OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ColumnFetcher.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
23 
24 namespace std {
25 template <>
26 struct hash<std::vector<int>> {
27  size_t operator()(const std::vector<int>& vec) const {
28  return vec.size() ^ boost::hash_range(vec.begin(), vec.end());
29  }
30 };
31 
32 template <>
33 struct hash<std::pair<int, int>> {
34  size_t operator()(const std::pair<int, int>& p) const {
35  return boost::hash<std::pair<int, int>>()(p);
36  }
37 };
38 
39 } // namespace std
40 
41 struct FetchResult {
42  std::vector<std::vector<const int8_t*>> col_buffers;
43  std::vector<std::vector<int64_t>> num_rows;
44  std::vector<std::vector<uint64_t>> frag_offsets;
45 };
46 
47 using MergedChunk = std::pair<AbstractBuffer*, AbstractBuffer*>;
48 
50  public:
51  ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache);
52 
54  static std::pair<const int8_t*, size_t> getOneColumnFragment(
55  Executor* executor,
56  const Analyzer::ColumnVar& hash_col,
57  const Fragmenter_Namespace::FragmentInfo& fragment,
58  const Data_Namespace::MemoryLevel effective_mem_lvl,
59  const int device_id,
60  DeviceAllocator* device_allocator,
61  const size_t thread_idx,
62  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
63  ColumnCacheMap& column_cache);
64 
67  Executor* executor,
68  const Analyzer::ColumnVar& hash_col,
69  const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
70  const Data_Namespace::MemoryLevel effective_mem_lvl,
71  const int device_id,
72  DeviceAllocator* device_allocator,
73  const size_t thread_idx,
74  std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
75  std::vector<std::shared_ptr<void>>& malloc_owner,
76  ColumnCacheMap& column_cache);
77 
78  const int8_t* getOneTableColumnFragment(
79  const shared::TableKey& table_key,
80  const int frag_id,
81  const int col_id,
82  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
83  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
84  std::list<ChunkIter>& chunk_iter_holder,
85  const Data_Namespace::MemoryLevel memory_level,
86  const int device_id,
87  DeviceAllocator* device_allocator) const;
88 
89  const int8_t* getAllTableColumnFragments(
90  const shared::TableKey& table_key,
91  const int col_id,
92  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
93  const Data_Namespace::MemoryLevel memory_level,
94  const int device_id,
95  DeviceAllocator* device_allocator,
96  const size_t thread_idx) const;
97 
98  const int8_t* getResultSetColumn(const InputColDescriptor* col_desc,
99  const Data_Namespace::MemoryLevel memory_level,
100  const int device_id,
101  DeviceAllocator* device_allocator,
102  const size_t thread_idx) const;
103 
104  const int8_t* linearizeColumnFragments(
105  const shared::TableKey& table_key,
106  const int col_id,
107  const std::map<shared::TableKey, const TableFragments*>& all_tables_fragments,
108  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
109  std::list<ChunkIter>& chunk_iter_holder,
110  const Data_Namespace::MemoryLevel memory_level,
111  const int device_id,
112  DeviceAllocator* device_allocator,
113  const size_t thread_idx) const;
114 
116  void freeLinearizedBuf();
117 
118  private:
119  static const int8_t* transferColumnIfNeeded(
120  const ColumnarResults* columnar_results,
121  const int col_id,
122  Data_Namespace::DataMgr* data_mgr,
123  const Data_Namespace::MemoryLevel memory_level,
124  const int device_id,
125  DeviceAllocator* device_allocator);
126 
128  int32_t db_id,
129  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
130  std::list<ChunkIter>& chunk_iter_holder,
131  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
132  std::list<ChunkIter>& local_chunk_iter_holder,
133  std::list<size_t>& local_chunk_num_tuples,
134  MemoryLevel memory_level,
135  const ColumnDescriptor* cd,
136  const int device_id,
137  const size_t total_data_buf_size,
138  const size_t total_idx_buf_size,
139  const size_t total_num_tuples,
140  DeviceAllocator* device_allocator,
141  const size_t thread_idx) const;
142 
144  int32_t db_id,
145  std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunk_holder,
146  std::list<ChunkIter>& chunk_iter_holder,
147  std::list<std::shared_ptr<Chunk_NS::Chunk>>& local_chunk_holder,
148  std::list<ChunkIter>& local_chunk_iter_holder,
149  std::list<size_t>& local_chunk_num_tuples,
150  MemoryLevel memory_level,
151  const ColumnDescriptor* cd,
152  const int device_id,
153  const size_t total_data_buf_size,
154  const size_t total_idx_buf_size,
155  const size_t total_num_tuples,
156  DeviceAllocator* device_allocator,
157  const size_t thread_idx) const;
158 
159  void addMergedChunkIter(const InputColDescriptor col_desc,
160  const int device_id,
161  int8_t* chunk_iter_ptr) const;
162 
163  const int8_t* getChunkiter(const InputColDescriptor col_desc,
164  const int device_id = 0) const;
165 
166  ChunkIter prepareChunkIter(AbstractBuffer* merged_data_buf,
167  AbstractBuffer* merged_index_buf,
168  ChunkIter& chunk_iter,
169  bool is_true_varlen_type,
170  const size_t total_num_tuples) const;
171 
172  const int8_t* getResultSetColumn(const ResultSetPtr& buffer,
173  const shared::TableKey& table_key,
174  const int col_id,
175  const Data_Namespace::MemoryLevel memory_level,
176  const int device_id,
177  DeviceAllocator* device_allocator,
178  const size_t thread_idx) const;
179 
180  Executor* executor_;
181  mutable std::mutex columnar_fetch_mutex_;
182  mutable std::mutex varlen_chunk_fetch_mutex_;
183  mutable std::mutex linearization_mutex_;
184  mutable std::mutex chunk_list_mutex_;
185  mutable std::mutex linearized_col_cache_mutex_;
187  mutable std::unordered_map<InputColDescriptor, std::unique_ptr<const ColumnarResults>>
189  using DeviceMergedChunkIterMap = std::unordered_map<int, int8_t*>;
190  using DeviceMergedChunkMap = std::unordered_map<int, AbstractBuffer*>;
191  mutable std::unordered_map<InputColDescriptor, DeviceMergedChunkIterMap>
193  mutable std::unordered_map<int, AbstractBuffer*>
195  mutable std::unordered_map<InputColDescriptor, DeviceMergedChunkMap>
197  mutable std::unordered_map<InputColDescriptor, DeviceMergedChunkMap>
199 
201  friend class TableFunctionExecutionContext; // TODO(adb)
202 };
std::unordered_map< int, int8_t * > DeviceMergedChunkIterMap
std::unordered_map< int, AbstractBuffer * > DeviceMergedChunkMap
std::mutex varlen_chunk_fetch_mutex_
void freeLinearizedBuf()
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
std::mutex columnar_fetch_mutex_
const int8_t * getResultSetColumn(const InputColDescriptor *col_desc, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
size_t operator()(const std::vector< int > &vec) const
Definition: ColumnFetcher.h:27
ColumnCacheMap columnarized_table_cache_
std::shared_ptr< ResultSet > ResultSetPtr
const int8_t * getChunkiter(const InputColDescriptor col_desc, const int device_id=0) const
Executor * executor_
const int8_t * getOneTableColumnFragment(const shared::TableKey &table_key, const int frag_id, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator) const
std::mutex linearization_mutex_
std::unordered_map< InputColDescriptor, std::unique_ptr< const ColumnarResults > > columnarized_scan_table_cache_
const int8_t * getAllTableColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_data_buf_cache_
void addMergedChunkIter(const InputColDescriptor col_desc, const int device_id, int8_t *chunk_iter_ptr) const
std::unordered_map< int, AbstractBuffer * > linearlized_temporary_cpu_index_buf_cache_
Used by Fragmenter classes to store info about each fragment - the fragment id and number of tuples(r...
Definition: Fragmenter.h:86
std::mutex linearized_col_cache_mutex_
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
ColumnFetcher(Executor *executor, const ColumnCacheMap &column_cache)
MergedChunk linearizeFixedLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
std::mutex chunk_list_mutex_
void freeTemporaryCpuLinearizedIdxBuf()
size_t operator()(const std::pair< int, int > &p) const
Definition: ColumnFetcher.h:34
static const int8_t * transferColumnIfNeeded(const ColumnarResults *columnar_results, const int col_id, Data_Namespace::DataMgr *data_mgr, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator)
static std::pair< const int8_t *, size_t > getOneColumnFragment(Executor *executor, const Analyzer::ColumnVar &hash_col, const Fragmenter_Namespace::FragmentInfo &fragment, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, ColumnCacheMap &column_cache)
Gets one chunk&#39;s pointer and element count on either CPU or GPU.
MergedChunk linearizeVarLenArrayColFrags(int32_t db_id, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, std::list< std::shared_ptr< Chunk_NS::Chunk >> &local_chunk_holder, std::list< ChunkIter > &local_chunk_iter_holder, std::list< size_t > &local_chunk_num_tuples, MemoryLevel memory_level, const ColumnDescriptor *cd, const int device_id, const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, DeviceAllocator *device_allocator, const size_t thread_idx) const
const int8_t * linearizeColumnFragments(const shared::TableKey &table_key, const int col_id, const std::map< shared::TableKey, const TableFragments * > &all_tables_fragments, std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunk_holder, std::list< ChunkIter > &chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, DeviceAllocator *device_allocator, const size_t thread_idx) const
Abstract class for managing device memory allocations.
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::pair< AbstractBuffer *, AbstractBuffer * > MergedChunk
Definition: ColumnFetcher.h:47
std::vector< std::vector< const int8_t * > > col_buffers
Definition: ColumnFetcher.h:42
ChunkIter prepareChunkIter(AbstractBuffer *merged_data_buf, AbstractBuffer *merged_index_buf, ChunkIter &chunk_iter, bool is_true_varlen_type, const size_t total_num_tuples) const
std::vector< std::vector< int64_t > > num_rows
Definition: ColumnFetcher.h:43
std::unordered_map< InputColDescriptor, DeviceMergedChunkIterMap > linearized_multi_frag_chunk_iter_cache_
std::vector< std::vector< uint64_t > > frag_offsets
Definition: ColumnFetcher.h:44
std::unordered_map< InputColDescriptor, DeviceMergedChunkMap > linearized_idx_buf_cache_
Descriptor for the fragments required for an execution kernel.