OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignTableRefreshScheduler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "LockMgr/LockMgr.h"
22 
24 
25 namespace foreign_storage {
26 
27 void ForeignTableRefreshScheduler::start(std::atomic<bool>& is_program_running) {
28  if (is_program_running && !is_scheduler_running_) {
29  is_scheduler_running_ = true;
30  scheduler_thread_ = std::thread([&is_program_running]() {
31  while (is_program_running && is_scheduler_running_) {
32  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
33  // Exit if scheduler has been stopped asynchronously
34  if (!is_program_running || !is_scheduler_running_) {
35  return;
36  }
37 
38  for (const auto& catalog : sys_catalog.getCatalogsForAllDbs()) {
39  // Exit if scheduler has been stopped asynchronously
40  if (!is_program_running || !is_scheduler_running_) {
41  return;
42  }
43  auto tables = catalog->getAllForeignTablesForRefresh();
44  for (auto table : tables) {
45  // Exit if scheduler has been stopped asynchronously
46  if (!is_program_running || !is_scheduler_running_) {
47  return;
48  }
49  try {
50  refresh_foreign_table(*catalog, table->tableName, false);
51  } catch (std::runtime_error& e) {
52  LOG(ERROR) << "Scheduled refresh for table \"" << table->tableName
53  << "\" resulted in an error. " << e.what();
54  }
55  has_refreshed_table_ = true;
56  }
57  }
58 
59  // Exit if scheduler has been stopped asynchronously
60  if (!is_program_running || !is_scheduler_running_) {
61  return;
62  }
63 
64  // A condition variable is used here (instead of a sleep call)
65  // in order to allow for thread wake-up, even in the middle
66  // of a wait interval.
67  std::unique_lock<std::mutex> wait_lock(wait_mutex_);
68  wait_condition_.wait_for(wait_lock, thread_wait_duration_);
69  }
70  });
71  }
72 }
73 
76  is_scheduler_running_ = false;
77  wait_condition_.notify_one();
78  scheduler_thread_.join();
79  }
80 }
81 
82 void ForeignTableRefreshScheduler::setWaitDuration(int64_t duration_in_seconds) {
83  thread_wait_duration_ = std::chrono::seconds{duration_in_seconds};
84 }
85 
87  return is_scheduler_running_;
88 }
89 
91  return has_refreshed_table_;
92 }
93 
95  has_refreshed_table_ = false;
96 }
97 
103 std::condition_variable ForeignTableRefreshScheduler::wait_condition_;
104 } // namespace foreign_storage
#define LOG(tag)
Definition: Logger.h:285
void refresh_foreign_table(Catalog_Namespace::Catalog &catalog, const std::string &table_name, const bool evict_cached_entries)
static void setWaitDuration(int64_t duration_in_seconds)
static SysCatalog & instance()
Definition: SysCatalog.h:343
static void start(std::atomic< bool > &is_program_running)
bool g_enable_foreign_table_scheduled_refresh