OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
17 #pragma once
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
27 #include "ExecutorResourcePool.h"
30 namespace ExecutorResourceMgr_Namespace {
42 struct ExecutorStats {
43  size_t requests{0};
44  size_t cpu_requests{0};
45  size_t gpu_requests{0};
46  size_t queue_length{0};
47  size_t cpu_queue_length{0};
48  size_t gpu_queue_length{0};
58  size_t requests_executing{0};
61  size_t requests_executed{0};
67  size_t total_time_ms{0};
68  size_t total_cpu_time_ms{0};
69  size_t total_gpu_time_ms{0};
71  size_t requests_timed_out{0};
72 };
83 struct RequestStats {
89  std::chrono::steady_clock::time_point enqueue_time; // in ms
90  std::chrono::steady_clock::time_point deque_time; // in ms
91  std::chrono::steady_clock::time_point execution_finished_time; // in ms
94  bool finished_queueing{false};
95  bool finished_executing{false};
96  size_t queue_time_ms{0};
97  size_t execution_time_ms{0};
98  size_t total_time_ms{0};
99  size_t timeout_in_ms{0};
100  bool timed_out{false};
101  // this variable will be filled w/ a corresponding msg when an error is occurred
102  // when processing the resource allocation request by ERM
103  std::optional<std::string> error;
106  const RequestInfo& request_info,
109  const std::chrono::steady_clock::time_point& enqueue_time,
110  const size_t queue_length_at_entry,
112  const size_t timeout_in_ms)
113  : request_id(request_id)
114  , request_info(request_info)
115  , min_resource_grant(min_resource_grant)
116  , max_resource_grant(max_resource_grant)
117  , enqueue_time(enqueue_time)
118  , queue_length_at_entry(queue_length_at_entry)
119  , device_type_queue_length_at_entry(device_type_queue_length_at_entry)
120  , timeout_in_ms(timeout_in_ms) {}
121 };
125 class ExecutorResourceHandle; // forward declaration
137 class ExecutorResourceMgr : public std::enable_shared_from_this<ExecutorResourceMgr> {
138  public:
145  const std::vector<std::pair<ResourceType, size_t>>& total_resources,
146  const std::vector<ConcurrentResourceGrantPolicy>&
147  concurrent_resource_grant_policies,
148  const std::vector<ResourceGrantPolicy>& max_per_request_resource_grant_policies,
149  const double max_available_resource_use_ratio);
169  std::unique_ptr<ExecutorResourceHandle> request_resources_with_timeout(
170  const RequestInfo& request_info,
171  const size_t timeout_in_ms);
185  std::unique_ptr<ExecutorResourceHandle> request_resources(
186  const RequestInfo& request_info);
201  void release_resources(const RequestId request_id, const ResourceGrant& resource_grant);
212  void print_executor_stats() const;
224  std::pair<size_t, size_t> get_resource_info(const ResourceType resource_type) const {
225  return executor_resource_pool_.get_resource_info(resource_type);
226  }
239  }
251  void set_resource(const ResourceType resource_type, const size_t resoure_quantity);
264  const ResourceType resource_type) const;
274  const ConcurrentResourceGrantPolicy& concurrent_resource_grant_policy);
286  void pause_process_queue();
292  void resume_process_queue();
294  private:
301  void process_queue_loop();
316  RequestStats get_request_for_id(const RequestId request_id) const;
318  void mark_request_error(const RequestId request_id, std::string error_msg);
353  RequestId enqueue_request(const RequestInfo& request_info,
354  const size_t timeout_in_ms,
355  const ResourceGrant& min_resource_grant,
356  const ResourceGrant& max_resource_grant);
368  void mark_request_dequed(const RequestId request_id);
380  void mark_request_timed_out(const RequestId request_id);
389  void mark_request_finished(const RequestId request_id);
402  std::unique_lock<std::mutex> queue_lock(processor_queue_mutex_);
403  should_process_queue_ = true;
404  }
423  std::vector<RequestId> get_requests_for_stage(
424  const ExecutionRequestStage request_status) const;
434  void add_request_to_stage(const RequestId request_id,
435  const ExecutionRequestStage request_status);
445  void remove_request_from_stage(const RequestId request_id,
446  const ExecutionRequestStage request_status);
468  std::atomic<size_t> requests_count_{0};
477  const size_t ACTUALLY_QUEUED_MIN_MS{2};
493  mutable std::mutex processor_queue_mutex_;
495  mutable std::mutex pause_processor_queue_mutex_;
496  mutable std::mutex print_mutex_;
514  std::condition_variable processor_queue_condition_;
515  std::condition_variable pause_processor_queue_condition_;
519  bool pause_process_queue_{false};
534  std::set<RequestId> queued_requests_;
540  std::set<RequestId> executing_requests_;
555  std::vector<RequestStats> requests_stats_;
557  const bool enable_stats_printing_{false};
558  const bool enable_debug_printing_{false};
560  const RequestId INVALID_REQUEST_ID{std::numeric_limits<size_t>::max()};
563 };
570 std::shared_ptr<ExecutorResourceMgr> generate_executor_resource_mgr(
571  const size_t num_cpu_slots,
572  const size_t num_gpu_slots,
573  const size_t cpu_result_mem,
574  const size_t cpu_buffer_pool_mem,
575  const size_t gpu_buffer_pool_mem,
576  const double per_query_max_cpu_slots_ratio,
577  const double per_query_max_cpu_result_mem_ratio,
578  const double per_query_max_pinned_cpu_buffer_pool_mem_ratio,
579  const double per_query_max_pageable_cpu_buffer_pool_mem_ratio,
580  const bool allow_cpu_kernel_concurrency,
581  const bool allow_cpu_gpu_kernel_concurrency,
582  const bool allow_cpu_slot_oversubscription_concurrency,
583  const bool allow_gpu_slot_oversubscription,
584  const bool allow_cpu_result_mem_oversubscription_concurrency,
585  const double max_available_resource_use_ratio);
595  public:
596  ExecutorResourceHandle(std::shared_ptr<ExecutorResourceMgr> resource_mgr,
597  const RequestId request_id,
598  const ResourceGrant& resource_grant)
599  : resource_mgr_(resource_mgr)
600  , request_id_(request_id)
601  , resource_grant_(resource_grant) {}
604  resource_mgr_->release_resources(request_id_, resource_grant_);
605  }
607  inline RequestId get_request_id() const { return request_id_; }
610  private:
611  std::shared_ptr<ExecutorResourceMgr> resource_mgr_;
614 };
616 } // namespace ExecutorResourceMgr_Namespace
A container for various stats about the current state of the ExecutorResourcePool. Note that ExecutorResourcePool does not persist a struct of this type, but rather builds one on the fly when ExecutorResourcePool::get_resource_info() is called.
void release_resources(const RequestId request_id, const ResourceGrant &resource_grant)
Instructs ExecutorResourceMgr that the resources held by the requestor with the given request_id can ...
A container to store requested and minimum neccessary resource requests across all resource types cur...
Stores and allows access to a binary semaphore per RequestId (using an std::unordered_map), as well as accessing all outstanding RequestIds for waiting requests.
std::shared_ptr< ExecutorResourceMgr > generate_executor_resource_mgr(const size_t num_cpu_slots, const size_t num_gpu_slots, const size_t cpu_result_mem, const size_t cpu_buffer_pool_mem, const size_t gpu_buffer_pool_mem, const double per_query_max_cpu_slots_ratio, const double per_query_max_cpu_result_mem_ratio, const double per_query_max_pinned_cpu_buffer_pool_mem_ratio, const double per_query_max_pageable_cpu_buffer_pool_mem_ratio, const bool allow_cpu_kernel_concurrency, const bool allow_cpu_gpu_kernel_concurrency, const bool allow_cpu_slot_oversubscription_concurrency, const bool allow_gpu_slot_oversubscription, const bool allow_cpu_result_mem_oversubscription_concurrency, const double max_available_resource_use_ratio)
Convenience factory-esque method that allows us to use the same logic to generate an ExecutorResource...
ChunkRequestInfo get_chunk_request_info(const RequestId request_id)
Get the DataMgr chunk ids and associated sizes pertaining to the input data needed by a request...
void remove_request_from_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Removes the request specified by the provided request_id from the specified stage...
std::unique_ptr< ExecutorResourceHandle > request_resources(const RequestInfo &request_info)
Requests resources from ExecutorResourceMgr, with no timeout (unlike request_resources_with_timeout) ...
void add_request_to_stage(const RequestId request_id, const ExecutionRequestStage request_status)
Internal method: Adds the request specified by the provided request_id to the specified stage...
ExecutorStats executor_stats_
Holds a single ExecutorStats struct that pertains to cummulative stats for ExecutorResourceMgr, i.e. number of requests, queue length, total execution time, etc.
Stores the resource type for a ExecutorResourcePool request.
std::shared_mutex queue_stats_mutex_
RW mutex that protects access to executor_stats_ and request_stats_
void process_queue_loop()
Internal method: A thread is assigned to run this function in the constructor of ExecutorResourceMgr...
Specifies the policies for resource grants in the presence of other requests, both under situations o...
ExecutorResourceMgr(const std::vector< std::pair< ResourceType, size_t >> &total_resources, const std::vector< ConcurrentResourceGrantPolicy > &concurrent_resource_grant_policies, const std::vector< ResourceGrantPolicy > &max_per_request_resource_grant_policies, const double max_available_resource_use_ratio)
The constructor instantiates an ExecutorResourcePool with the provided parameters, and starts the process queue by launching a thread to invoke process_queue_loop.
void stop_process_queue_thread()
Internal method: Invoked from ExecutorResourceMgr destructor, sets stop_process_queue_thread_ to true...
The destructor ensures that the process queue thread (process_queue_thread) is stopped and that any t...
void set_concurrent_resource_grant_policy(const ConcurrentResourceGrantPolicy &concurrent_resource_grant_policy)
Set the concurrent resource grant policy for a given resource type (stored in ConcurrentResourceGrant...
void mark_request_error(const RequestId request_id, std::string error_msg)
std::chrono::steady_clock::time_point deque_time
std::vector< RequestId > get_requests_for_stage(const ExecutionRequestStage request_status) const
Internal method: Get the request ids for a given stage (QUEUED or EXECUTING)
Specifies the resources of each type for a given resource grant.
std::mutex processor_queue_mutex_
RW mutex that protects access to stop_process_queue_thread_ and pause_processor_queue_ ...
ExecutorResourcePool executor_resource_pool_
Keeps track of available resources for execution.
ResourcePoolInfo get_resource_info() const
Returns a struct containing the total and allocated amounts of all resources tracked by ExecutorResou...
std::set< RequestId > executing_requests_
Set of all request ids that are currently executing (i.e. post-granting of resources). Protected by executing_set_mutex_.
std::atomic< size_t > requests_count_
An atomic that is incremented with each incoming request, and used to assign RequestIds to incoming r...
ConcurrentResourceGrantPolicy get_concurrent_resource_grant_policy(const ResourceType resource_type) const
Get the concurrent resource grant policy for a given resource type.
void mark_request_dequed(const RequestId request_id)
Internal method: Moves the request from the QUEUED stage to EXECUTING stage and performs other bookke...
RequestId choose_next_request()
Internal method: Invoked from process_queue_loop, chooses the next resource request to grant...
ExecutorResourceHandle(std::shared_ptr< ExecutorResourceMgr > resource_mgr, const RequestId request_id, const ResourceGrant &resource_grant)
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.
std::shared_mutex queued_set_mutex_
RW mutex that protects access to queued_requests_
ExecutorResourcePool keeps track of available compute and memory resources and can be queried to get ...
OutstandingQueueRequests outstanding_queue_requests_
Stores and manages a map of request ids to BinarySemaphore objects to allow threads waiting for resou...
Stores current key statistics relating to ExecutorResourceMgr state, particularly around the number o...
A wrapper returned by ExecutorResourceMgr to the requestee, containing the ResourceGrant that was gra...
void pause_process_queue()
Pauses the process queue in a thread-safe manner, waiting for all queries in the executing stage to f...
RequestId request_id()
Definition: Logger.cpp:874
ExecutorStats get_executor_stats() const
Returns a copy of the ExecutorStats struct held by ExecutorResourceMgr. Used for testing currently...
void mark_request_finished(const RequestId request_id)
Internal method: Invoked on successful completion of a query step from release_resources method...
std::pair< size_t, size_t > get_resource_info(const ResourceType resource_type) const
Returns the allocated and total available amount of the resource specified.
std::unique_ptr< ExecutorResourceHandle > request_resources_with_timeout(const RequestInfo &request_info, const size_t timeout_in_ms)
Requests resources from ExecutorResourceMgr, will throw if request takes longer than time specified b...
std::shared_mutex executing_set_mutex_
RW mutex that protects access to executing_requests_
std::chrono::steady_clock::time_point enqueue_time
void set_resource(const ResourceType resource_type, const size_t resoure_quantity)
Used to change the total amount available of a specified resource after construction of ExecutorResou...
std::chrono::steady_clock::time_point execution_finished_time
void mark_request_timed_out(const RequestId request_id)
Internal method: Called if the request times out (i.e. request was made via request_resources_with_ti...
void resume_process_queue()
Resumes the process queue in a thread-safe manner. If the process queue is already paused...
ExecutorResourceMgr is the central manager for resources available to all executors in the system...
std::thread process_queue_thread_
The thread started in the ExecutorResourceMgr constructor that continuously loops inside of process_q...
std::shared_timed_mutex shared_mutex
Specifies all DataMgr chunks needed for a query step/request, along with their sizes in bytes...
void set_process_queue_flag()
Internal method: Set the should_process_queue_ flag to true, signifying that the queue should be proc...
void print_executor_stats() const
Prints the ExecutorStats struct. Use for debugging.
std::set< RequestId > queued_requests_
Set of all request ids that are currently queued. Protected by queued_set_mutex_. ...
RequestStats(const RequestId request_id, const RequestInfo &request_info, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant, const std::chrono::steady_clock::time_point &enqueue_time, const size_t queue_length_at_entry, const size_t device_type_queue_length_at_entry, const size_t timeout_in_ms)
std::vector< RequestStats > requests_stats_
Stores a vector of all requests that have been seen by ExecutorResourceMgr, with each incoming reques...
Stores info pertaining to a single request made to ExecutorResourceMgr, including its request_id...
std::shared_ptr< ExecutorResourceMgr > resource_mgr_
RequestId enqueue_request(const RequestInfo &request_info, const size_t timeout_in_ms, const ResourceGrant &min_resource_grant, const ResourceGrant &max_resource_grant)
Internal method: Invoked from request_resource/request_resource_with_timeout, places request in the r...
RequestStats get_request_for_id(const RequestId request_id) const
Internal method: Returns the RequestStats for a request specified by request_id.