OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FsiChunkUtils.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <map>
20 
21 #include "DataMgr/Chunk/Chunk.h"
22 #include "DataMgr/ChunkMetadata.h"
23 #include "Shared/distributed.h"
24 
25 namespace foreign_storage {
27  const ChunkKey& chunk_key,
28  const std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
29  const std::map<ChunkKey, AbstractBuffer*>& buffers,
30  Chunk_NS::Chunk& chunk);
31 
32 // Construct default metadata for given column descriptor with num_elements
33 std::shared_ptr<ChunkMetadata> get_placeholder_metadata(const SQLTypeInfo& type,
34  size_t num_elements);
35 /*
36  Splits up a set of items to be processed into multiple partitions, with the intention
37  that each thread will process a separate part.
38  */
39 // TODO(Misiu): Change this to return a list of Views/Ranges when we support c++20.
40 template <typename T>
41 auto partition_for_threads(const std::set<T>& items, size_t max_threads) {
42  const size_t items_per_thread = (items.size() + (max_threads - 1)) / max_threads;
43  std::list<std::set<T>> items_by_thread;
44  auto i = 0U;
45  for (auto item : items) {
46  if (i++ % items_per_thread == 0) {
47  items_by_thread.emplace_back(std::set<T>{});
48  }
49  items_by_thread.back().emplace(item);
50  }
51  return items_by_thread;
52 }
53 
54 /*
55  Splits up a vector of items to be processed into multiple partitions, with the intention
56  that each thread will process a separate part.
57  */
58 // TODO: refactor partition_for_threads to use Requires when we support c++20
59 template <typename T>
60 auto partition_for_threads(const std::vector<T>& items, size_t max_threads) {
61  const size_t items_per_thread = (items.size() + (max_threads - 1)) / max_threads;
62  std::list<std::vector<T>> items_by_thread;
63  auto i = 0U;
64  for (auto item : items) {
65  if (i++ % items_per_thread == 0) {
66  items_by_thread.emplace_back(std::vector<T>{});
67  }
68  items_by_thread.back().emplace_back(item);
69  }
70  return items_by_thread;
71 }
72 
73 template <typename Container>
74 std::vector<std::future<void>> create_futures_for_workers(
75  const Container& items,
76  size_t max_threads,
77  std::function<void(const Container&)> lambda) {
78  auto items_per_thread = partition_for_threads(items, max_threads);
79  std::vector<std::future<void>> futures;
80  for (const auto& items : items_per_thread) {
81  futures.emplace_back(std::async(std::launch::async, lambda, items));
82  }
83 
84  return futures;
85 }
86 
88 
89 bool is_system_table_chunk_key(const ChunkKey& chunk_key);
90 
91 bool is_replicated_table_chunk_key(const ChunkKey& chunk_key);
92 
93 bool is_append_table_chunk_key(const ChunkKey& chunk_key);
94 
95 bool is_shardable_key(const ChunkKey& key);
96 
97 bool fragment_maps_to_leaf(const ChunkKey& key);
98 
99 bool key_does_not_shard_to_leaf(const ChunkKey& key);
100 } // namespace foreign_storage
std::vector< int > ChunkKey
Definition: types.h:36
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
bool is_append_table_chunk_key(const ChunkKey &chunk_key)
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
bool key_does_not_shard_to_leaf(const ChunkKey &key)
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
future< Result > async(Fn &&fn, Args &&...args)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
const foreign_storage::ForeignTable & get_foreign_table_for_key(const ChunkKey &key)