19 #include <condition_variable>
31 using Task = std::packaged_task<void(size_t)>;
34 workers_.resize(parallel_executors_max);
35 for (
size_t i = 0; i <
workers_.size(); i++) {
48 void submit(std::shared_ptr<Task> task,
const bool is_update_delete) {
49 if (
workers_.size() == 1 && is_update_delete) {
50 std::lock_guard<decltype(update_delete_mutex_)> update_delete_lock(
61 std::unique_lock<decltype(queue_mutex_)> lock(
queue_mutex_);
63 LOG(
INFO) <<
"Dispatching query with " <<
queue_.size() <<
" queries in the queue.";
70 std::lock_guard<decltype(queue_mutex_)> lock(
queue_mutex_);
76 std::lock_guard<decltype(queue_mutex_)> lock(
queue_mutex_);
86 void worker(
const size_t worker_idx) {
96 auto task =
queue_.front();
100 LOG(
INFO) <<
"Worker " << worker_idx
101 <<
" running query and returning control. There are now "
103 <<
" queries in the queue.";
116 std::condition_variable
cv_;
121 std::queue<std::shared_ptr<Task>>
queue_;
std::condition_variable cv_
bool threads_should_exit_
std::mutex update_delete_mutex_
std::packaged_task< void(size_t)> Task
void submit(std::shared_ptr< Task > task, const bool is_update_delete)
void worker(const size_t worker_idx)
std::queue< std::shared_ptr< Task > > queue_
std::vector< std::thread > workers_
QueryDispatchQueue(const size_t parallel_executors_max)