OmniSciDB  8a228a1076
QueryDispatchQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <future>
21 #include <mutex>
22 #include <queue>
23 #include <thread>
24 
30  public:
31  using Task = std::packaged_task<void(size_t)>;
32 
33  QueryDispatchQueue(const size_t parallel_executors_max) {
34  workers_.resize(parallel_executors_max);
35  for (size_t i = 0; i < workers_.size(); i++) {
36  // worker IDs are 1-indexed, leaving Executor 0 for non-dispatch queue worker tasks
37  workers_[i] = std::thread(&QueryDispatchQueue::worker, this, i + 1);
38  }
39  }
40 
46  void submit(std::shared_ptr<Task> task) {
47  std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
48 
49  LOG(INFO) << "Dispatching query with " << queue_.size() << " queries in the queue.";
50  queue_.push(task);
51  lock.unlock();
52  cv_.notify_all();
53  }
54 
56  {
57  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
58  threads_should_exit_ = true;
59  }
60  cv_.notify_all();
61  for (auto& worker : workers_) {
62  worker.join();
63  }
64  }
65 
66  private:
67  void worker(const size_t worker_idx) {
68  std::unique_lock<std::mutex> lock(queue_mutex_);
69  while (true) {
70  cv_.wait(lock, [this] { return !queue_.empty() || threads_should_exit_; });
71 
73  return;
74  }
75 
76  if (!queue_.empty()) {
77  auto task = queue_.front();
78  queue_.pop();
79 
80  LOG(INFO) << "Running query and returning control. There are now "
81  << queue_.size() << " queries in the queue.";
82  // allow other threads to pick up tasks
83  lock.unlock();
84  CHECK(task);
85  (*task)(worker_idx);
86 
87  // wait for signal
88  lock.lock();
89  }
90  }
91  }
92 
93  std::mutex queue_mutex_;
94  std::condition_variable cv_;
95 
96  bool threads_should_exit_{false};
97  std::queue<std::shared_ptr<Task>> queue_;
98  std::vector<std::thread> workers_;
99 };
std::condition_variable cv_
#define LOG(tag)
Definition: Logger.h:188
std::packaged_task< void(size_t)> Task
void worker(const size_t worker_idx)
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
QueryDispatchQueue(const size_t parallel_executors_max)
void submit(std::shared_ptr< Task > task)
#define CHECK(condition)
Definition: Logger.h:197