OmniSciDB  2e3a973ef4
QueryDispatchQueue Class Reference

#include <QueryDispatchQueue.h>

Public Types

using Task = std::packaged_task< void(size_t)>
 

Public Member Functions

 QueryDispatchQueue (const size_t parallel_executors_max)
 
void submit (std::shared_ptr< Task > task, const bool is_update_delete)
 
 ~QueryDispatchQueue ()
 

Private Member Functions

void worker (const size_t worker_idx)
 

Private Attributes

std::mutex queue_mutex_
 
std::condition_variable cv_
 
std::mutex update_delete_mutex_
 
bool threads_should_exit_ {false}
 
std::queue< std::shared_ptr< Task > > queue_
 
std::vector< std::thread > workers_
 

Detailed Description

QueryDispatchQueue maintains a list of pending queries and dispatches those queries as Executors become available

Definition at line 29 of file QueryDispatchQueue.h.

Member Typedef Documentation

◆ Task

using QueryDispatchQueue::Task = std::packaged_task<void(size_t)>

Definition at line 31 of file QueryDispatchQueue.h.

Constructor & Destructor Documentation

◆ QueryDispatchQueue()

QueryDispatchQueue::QueryDispatchQueue ( const size_t  parallel_executors_max)
inline

Definition at line 33 of file QueryDispatchQueue.h.

References worker(), and workers_.

33  {
34  workers_.resize(parallel_executors_max);
35  for (size_t i = 0; i < workers_.size(); i++) {
36  // worker IDs are 1-indexed, leaving Executor 0 for non-dispatch queue worker tasks
37  workers_[i] = std::thread(&QueryDispatchQueue::worker, this, i + 1);
38  }
39  }
void worker(const size_t worker_idx)
std::vector< std::thread > workers_
+ Here is the call graph for this function:

◆ ~QueryDispatchQueue()

QueryDispatchQueue::~QueryDispatchQueue ( )
inline

Definition at line 67 of file QueryDispatchQueue.h.

References cv_, queue_mutex_, threads_should_exit_, worker(), and workers_.

67  {
68  {
69  std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
70  threads_should_exit_ = true;
71  }
72  cv_.notify_all();
73  for (auto& worker : workers_) {
74  worker.join();
75  }
76  }
std::condition_variable cv_
void worker(const size_t worker_idx)
std::vector< std::thread > workers_
+ Here is the call graph for this function:

Member Function Documentation

◆ submit()

void QueryDispatchQueue::submit ( std::shared_ptr< Task task,
const bool  is_update_delete 
)
inline

Submit a new task to the queue. Blocks until the task begins execution. The caller is expected to maintain a copy of the shared_ptr which will be used to access results once the task runs.

Definition at line 46 of file QueryDispatchQueue.h.

References CHECK, cv_, logger::INFO, LOG, queue_, queue_mutex_, update_delete_mutex_, and workers_.

46  {
47  if (workers_.size() == 1 && is_update_delete) {
48  std::lock_guard<decltype(update_delete_mutex_)> update_delete_lock(
50  CHECK(task);
51  // We only have 1 worker. Run this task on the calling thread on a special, second
52  // worker. The task is under the update delete lock, so we don't have to worry about
53  // contention on the special worker. This protects against deadlocks should the
54  // query running (or any pending queries) hold a read lock on something that
55  // requires a write lock during update/delete.
56  (*task)(2);
57  return;
58  }
59  std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
60 
61  LOG(INFO) << "Dispatching query with " << queue_.size() << " queries in the queue.";
62  queue_.push(task);
63  lock.unlock();
64  cv_.notify_all();
65  }
std::condition_variable cv_
std::mutex update_delete_mutex_
#define LOG(tag)
Definition: Logger.h:188
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
#define CHECK(condition)
Definition: Logger.h:197

◆ worker()

void QueryDispatchQueue::worker ( const size_t  worker_idx)
inlineprivate

Definition at line 79 of file QueryDispatchQueue.h.

References CHECK, cv_, logger::INFO, LOG, queue_, queue_mutex_, and threads_should_exit_.

Referenced by QueryDispatchQueue(), and ~QueryDispatchQueue().

79  {
80  std::unique_lock<std::mutex> lock(queue_mutex_);
81  while (true) {
82  cv_.wait(lock, [this] { return !queue_.empty() || threads_should_exit_; });
83 
85  return;
86  }
87 
88  if (!queue_.empty()) {
89  auto task = queue_.front();
90  queue_.pop();
91 
92  LOG(INFO) << "Worker " << worker_idx
93  << " running query and returning control. There are now "
94  << queue_.size() << " queries in the queue.";
95  // allow other threads to pick up tasks
96  lock.unlock();
97  CHECK(task);
98  (*task)(worker_idx);
99 
100  // wait for signal
101  lock.lock();
102  }
103  }
104  }
std::condition_variable cv_
#define LOG(tag)
Definition: Logger.h:188
std::queue< std::shared_ptr< Task > > queue_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

Member Data Documentation

◆ cv_

std::condition_variable QueryDispatchQueue::cv_
private

Definition at line 107 of file QueryDispatchQueue.h.

Referenced by submit(), worker(), and ~QueryDispatchQueue().

◆ queue_

std::queue<std::shared_ptr<Task> > QueryDispatchQueue::queue_
private

Definition at line 112 of file QueryDispatchQueue.h.

Referenced by submit(), and worker().

◆ queue_mutex_

std::mutex QueryDispatchQueue::queue_mutex_
private

Definition at line 106 of file QueryDispatchQueue.h.

Referenced by submit(), worker(), and ~QueryDispatchQueue().

◆ threads_should_exit_

bool QueryDispatchQueue::threads_should_exit_ {false}
private

Definition at line 111 of file QueryDispatchQueue.h.

Referenced by worker(), and ~QueryDispatchQueue().

◆ update_delete_mutex_

std::mutex QueryDispatchQueue::update_delete_mutex_
private

Definition at line 109 of file QueryDispatchQueue.h.

Referenced by submit().

◆ workers_

std::vector<std::thread> QueryDispatchQueue::workers_
private

Definition at line 113 of file QueryDispatchQueue.h.

Referenced by QueryDispatchQueue(), submit(), and ~QueryDispatchQueue().


The documentation for this class was generated from the following file: