OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
OutstandingQueueRequests.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <shared_mutex>
20 #include <unordered_map>
21 #include <vector>
22 
24 #include "Logger/Logger.h"
25 #include "Shared/BinarySemaphore.h"
26 
27 namespace ExecutorResourceMgr_Namespace {
28 
37  std::unordered_map<RequestId, SemaphoreShim_Namespace::BinarySemaphore>;
38 
39  public:
48  auto& wait_semaphore = get_semaphore_for_request(request_id);
49  wait_semaphore.try_acquire();
50  delete_semaphore_for_request(request_id);
51  }
52 
62  const size_t max_wait_in_ms) {
63  CHECK_GT(max_wait_in_ms, size_t(0));
64  auto& wait_semaphore = get_semaphore_for_request(request_id);
65  // Binary semaphore returns false if it was not unblocked before the specified timeout
66  const bool did_timeout = !(wait_semaphore.try_acquire_for(max_wait_in_ms));
67  delete_semaphore_for_request(request_id);
68  if (did_timeout) {
69  throw QueryTimedOutWaitingInQueue(max_wait_in_ms);
70  }
71  }
72 
79  std::vector<RequestId> get_outstanding_request_ids() {
80  std::vector<RequestId> outstanding_request_ids;
81  std::shared_lock<std::shared_mutex> requests_read_lock(requests_map_mutex_);
82  outstanding_request_ids.reserve(outstanding_requests_map_.size());
83  for (const auto& request_entry : outstanding_requests_map_) {
84  outstanding_request_ids.emplace_back(request_entry.first);
85  }
86  return outstanding_request_ids;
87  }
88 
96  std::unique_lock<std::shared_mutex> requests_write_lock(requests_map_mutex_);
97  const auto request_itr = outstanding_requests_map_.find(request_id);
98  if (request_itr == outstanding_requests_map_.end()) {
100  } else {
101  request_itr->second.release();
102  }
103  }
104 
105  private:
117  const RequestId request_id) {
118  std::unique_lock<std::shared_mutex> requests_write_lock(requests_map_mutex_);
120  }
121 
131  std::unique_lock<std::shared_mutex> requests_write_lock(requests_map_mutex_);
132  CHECK_EQ(outstanding_requests_map_.erase(request_id),
133  size_t(1)); // Ensure the erase call returns 1, meaning there was actually
134  // an entry in the map matching this request_id to delete
135  }
136 
141 
146 };
147 
148 } // namespace ExecutorResourceMgr_Namespace
Stores and allows access to a binary semaphore per RequestId (using an std::unordered_map), as well as accessing all outstanding RequestIds for waiting requests.
#define CHECK_EQ(x, y)
Definition: Logger.h:301
SemaphoreShim_Namespace::BinarySemaphore & get_semaphore_for_request(const RequestId request_id)
Creates a new entry in outstanding_requests_map_, assigning a BinarySemaphore for the given requestin...
void queue_request_and_wait(const RequestId request_id)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
void delete_semaphore_for_request(const RequestId request_id)
Internal method: removes a RequestId-BinarySemaphore entry from outstanding_requests_map_. Invoked after a request thread is awoken (including on timeout).
#define CHECK_GT(x, y)
Definition: Logger.h:305
OutstandingQueueRequestsMap outstanding_requests_map_
Stores a map of RequestId to BinarySemaphore
std::vector< RequestId > get_outstanding_request_ids()
Get the RequestIds of all outsanding requests in the queue.
void wake_request_by_id(const RequestId request_id)
Wakes a waiting thread in the queue. Invoked by ExecutorResourceMgr::process_queue_loop() ...
RequestId request_id()
Definition: Logger.cpp:874
Utility type that implemnts behavior of a blocking binary semaphore, with an optional timeout...
void queue_request_and_wait_with_timeout(const RequestId request_id, const size_t max_wait_in_ms)
Submits a request with id request_id into the queue, waiting on a BinarySemaphore until ExecutorResou...
std::unordered_map< RequestId, SemaphoreShim_Namespace::BinarySemaphore > OutstandingQueueRequestsMap
std::shared_mutex requests_map_mutex_
Read-write lock protecting the outstanding_requests_map_
std::shared_timed_mutex shared_mutex