OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignTableRefreshScheduler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "LockMgr/LockMgr.h"
22 
23 namespace foreign_storage {
24 
26  auto execute_write_lock =
30  // todo (yoonmin): support per-table invalidation
33 }
34 
35 void ForeignTableRefreshScheduler::start(std::atomic<bool>& is_program_running) {
36  if (is_program_running && !is_scheduler_running_) {
37  is_scheduler_running_ = true;
38  scheduler_thread_ = std::thread([&is_program_running]() {
39  while (is_program_running && is_scheduler_running_) {
40  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
41  // Exit if scheduler has been stopped asynchronously
42  if (!is_program_running || !is_scheduler_running_) {
43  return;
44  }
45  bool at_least_one_table_refreshed = false;
46  for (const auto& catalog : sys_catalog.getCatalogsForAllDbs()) {
47  // Exit if scheduler has been stopped asynchronously
48  if (!is_program_running || !is_scheduler_running_) {
49  return;
50  }
51  auto tables = catalog->getAllForeignTablesForRefresh();
52  for (auto table : tables) {
53  // Exit if scheduler has been stopped asynchronously
54  if (!is_program_running || !is_scheduler_running_) {
55  return;
56  }
57  try {
58  refresh_foreign_table(*catalog, table->tableName, false);
59  } catch (std::runtime_error& e) {
60  LOG(ERROR) << "Scheduled refresh for table \"" << table->tableName
61  << "\" resulted in an error. " << e.what();
62  }
63  has_refreshed_table_ = true;
64  at_least_one_table_refreshed = true;
65  }
66  }
67  if (at_least_one_table_refreshed) {
69  }
70  // Exit if scheduler has been stopped asynchronously
71  if (!is_program_running || !is_scheduler_running_) {
72  return;
73  }
74 
75  // A condition variable is used here (instead of a sleep call)
76  // in order to allow for thread wake-up, even in the middle
77  // of a wait interval.
78  std::unique_lock<std::mutex> wait_lock(wait_mutex_);
79  wait_condition_.wait_for(wait_lock, thread_wait_duration_);
80  }
81  });
82  }
83 }
84 
87  is_scheduler_running_ = false;
88  wait_condition_.notify_one();
89  scheduler_thread_.join();
90  }
91 }
92 
93 void ForeignTableRefreshScheduler::setWaitDuration(int64_t duration_in_seconds) {
94  thread_wait_duration_ = std::chrono::seconds{duration_in_seconds};
95 }
96 
98  return is_scheduler_running_;
99 }
100 
102  return has_refreshed_table_;
103 }
104 
106  has_refreshed_table_ = false;
107 }
108 
114 std::condition_variable ForeignTableRefreshScheduler::wait_condition_;
115 } // namespace foreign_storage
static std::shared_ptr< WrapperType< MutexType > > getMutex(const LockType lockType, const KeyType &key)
#define LOG(tag)
Definition: Logger.h:216
void refresh_foreign_table(Catalog_Namespace::Catalog &catalog, const std::string &table_name, const bool evict_cached_entries)
static void invalidateCaches()
static void setWaitDuration(int64_t duration_in_seconds)
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::unique_lock< T > unique_lock
static void start(std::atomic< bool > &is_program_running)