OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryDispatchQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <future>
21 #include <mutex>
22 #include <queue>
23 #include <thread>
24 
30  public:
31  using Task = std::packaged_task<void(size_t)>;
32 
33  QueryDispatchQueue(const size_t parallel_executors_max) {
34  workers_.resize(parallel_executors_max);
35  for (size_t i = 0; i < workers_.size(); i++) {
36  // worker IDs are 1-indexed, leaving Executor 0 for non-dispatch queue worker tasks
37  workers_[i] = std::thread(&QueryDispatchQueue::worker, this, i + 1);
38  }
40  num_workers_ = parallel_executors_max;
41  }
42 
48  void submit(std::shared_ptr<Task> task, const bool is_update_delete) {
49  if (workers_.size() == 1 && is_update_delete) {
50  std::lock_guard<decltype(update_delete_mutex_)> update_delete_lock(
52  CHECK(task);
53  // We only have 1 worker. Run this task on the calling thread on a special, second
54  // worker. The task is under the update delete lock, so we don't have to worry about
55  // contention on the special worker. This protects against deadlocks should the
56  // query running (or any pending queries) hold a read lock on something that
57  // requires a write lock during update/delete.
58  (*task)(2);
59  return;
60  }
61  std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
62 
63  LOG(INFO) << "Dispatching query with " << queue_.size() << " queries in the queue.";
64  queue_.push(task);
65  lock.unlock();
66  cv_.notify_all();
67  }
68 
69  bool hasIdleWorker() {
70  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
72  }
73 
75  {
76  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
77  threads_should_exit_ = true;
78  }
79  cv_.notify_all();
80  for (auto& worker : workers_) {
81  worker.join();
82  }
83  }
84 
85  private:
86  void worker(const size_t worker_idx) {
87  std::unique_lock<std::mutex> lock(queue_mutex_);
88  while (true) {
89  cv_.wait(lock, [this] { return !queue_.empty() || threads_should_exit_; });
90 
92  return;
93  }
94 
95  if (!queue_.empty()) {
96  auto task = queue_.front();
97  queue_.pop();
99 
100  LOG(INFO) << "Worker " << worker_idx
101  << " running query and returning control. There are now "
102  << num_running_workers_ << " workers are running and " << queue_.size()
103  << " queries in the queue.";
104  // allow other threads to pick up tasks
105  lock.unlock();
106  CHECK(task);
107  (*task)(worker_idx);
108  // wait for signal
109  lock.lock();
111  }
112  }
113  }
114 
115  std::mutex queue_mutex_;
116  std::condition_variable cv_;
117 
119 
120  bool threads_should_exit_{false};
121  std::queue<std::shared_ptr<Task>> queue_;
122  std::vector<std::thread> workers_;
123  int num_running_workers_; // manipulate this under queue_lock
125 };
std::condition_variable cv_
std::mutex update_delete_mutex_
#define LOG(tag)
Definition: Logger.h:285
std::packaged_task< void(size_t)> Task
void submit(std::shared_ptr< Task > task, const bool is_update_delete)
void worker(const size_t worker_idx)
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
QueryDispatchQueue(const size_t parallel_executors_max)
#define CHECK(condition)
Definition: Logger.h:291