OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryDispatchQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <future>
21 #include <mutex>
22 #include <queue>
23 #include <thread>
24 
30  public:
31  using Task = std::packaged_task<void(size_t)>;
32 
33  QueryDispatchQueue(const size_t parallel_executors_max) {
34  workers_.resize(parallel_executors_max);
35  for (size_t i = 0; i < workers_.size(); i++) {
36  // worker IDs are 1-indexed, leaving Executor 0 for non-dispatch queue worker tasks
37  workers_[i] = std::thread(&QueryDispatchQueue::worker, this, i + 1);
38  }
39  }
40 
46  void submit(std::shared_ptr<Task> task, const bool is_update_delete) {
47  if (workers_.size() == 1 && is_update_delete) {
48  std::lock_guard<decltype(update_delete_mutex_)> update_delete_lock(
50  CHECK(task);
51  // We only have 1 worker. Run this task on the calling thread on a special, second
52  // worker. The task is under the update delete lock, so we don't have to worry about
53  // contention on the special worker. This protects against deadlocks should the
54  // query running (or any pending queries) hold a read lock on something that
55  // requires a write lock during update/delete.
56  (*task)(2);
57  return;
58  }
59  std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
60 
61  LOG(INFO) << "Dispatching query with " << queue_.size() << " queries in the queue.";
62  queue_.push(task);
63  lock.unlock();
64  cv_.notify_all();
65  }
66 
68  {
69  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
70  threads_should_exit_ = true;
71  }
72  cv_.notify_all();
73  for (auto& worker : workers_) {
74  worker.join();
75  }
76  }
77 
78  private:
79  void worker(const size_t worker_idx) {
80  std::unique_lock<std::mutex> lock(queue_mutex_);
81  while (true) {
82  cv_.wait(lock, [this] { return !queue_.empty() || threads_should_exit_; });
83 
84  if (threads_should_exit_) {
85  return;
86  }
87 
88  if (!queue_.empty()) {
89  auto task = queue_.front();
90  queue_.pop();
91 
92  LOG(INFO) << "Worker " << worker_idx
93  << " running query and returning control. There are now "
94  << queue_.size() << " queries in the queue.";
95  // allow other threads to pick up tasks
96  lock.unlock();
97  CHECK(task);
98  (*task)(worker_idx);
99 
100  // wait for signal
101  lock.lock();
102  }
103  }
104  }
105 
106  std::mutex queue_mutex_;
107  std::condition_variable cv_;
108 
110 
111  bool threads_should_exit_{false};
112  std::queue<std::shared_ptr<Task>> queue_;
113  std::vector<std::thread> workers_;
114 };
std::condition_variable cv_
std::mutex update_delete_mutex_
#define LOG(tag)
Definition: Logger.h:188
std::packaged_task< void(size_t)> Task
void submit(std::shared_ptr< Task > task, const bool is_update_delete)
void worker(const size_t worker_idx)
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
QueryDispatchQueue(const size_t parallel_executors_max)
#define CHECK(condition)
Definition: Logger.h:197