OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignTableRefreshScheduler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
20 #include "LockMgr/LockMgr.h"
22 
23 namespace foreign_storage {
24 
26  auto execute_write_lock = mapd_unique_lock<mapd_shared_mutex>(
30 }
31 
32 void ForeignTableRefreshScheduler::start(std::atomic<bool>& is_program_running) {
33  if (is_program_running && !is_scheduler_running_) {
34  is_scheduler_running_ = true;
35  scheduler_thread_ = std::thread([&is_program_running]() {
36  while (is_program_running && is_scheduler_running_) {
37  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
38  // Exit if scheduler has been stopped asynchronously
39  if (!is_program_running || !is_scheduler_running_) {
40  return;
41  }
42  bool at_least_one_table_refreshed = false;
43  for (const auto& catalog : sys_catalog.getCatalogsForAllDbs()) {
44  // Exit if scheduler has been stopped asynchronously
45  if (!is_program_running || !is_scheduler_running_) {
46  return;
47  }
48  auto tables = catalog->getAllForeignTablesForRefresh();
49  for (auto table : tables) {
50  // Exit if scheduler has been stopped asynchronously
51  if (!is_program_running || !is_scheduler_running_) {
52  return;
53  }
54  try {
55  refresh_foreign_table(*catalog, table->tableName, false);
56  } catch (std::runtime_error& e) {
57  LOG(ERROR) << "Scheduled refresh for table \"" << table->tableName
58  << "\" resulted in an error. " << e.what();
59  }
60  has_refreshed_table_ = true;
61  at_least_one_table_refreshed = true;
62  }
63  }
64  if (at_least_one_table_refreshed) {
66  }
67  // Exit if scheduler has been stopped asynchronously
68  if (!is_program_running || !is_scheduler_running_) {
69  return;
70  }
71 
72  // A condition variable is used here (instead of a sleep call)
73  // in order to allow for thread wake-up, even in the middle
74  // of a wait interval.
75  std::unique_lock<std::mutex> wait_lock(wait_mutex_);
76  wait_condition_.wait_for(wait_lock, thread_wait_duration_);
77  }
78  });
79  }
80 }
81 
84  is_scheduler_running_ = false;
85  wait_condition_.notify_one();
86  scheduler_thread_.join();
87  }
88 }
89 
90 void ForeignTableRefreshScheduler::setWaitDuration(int64_t duration_in_seconds) {
91  thread_wait_duration_ = std::chrono::seconds{duration_in_seconds};
92 }
93 
95  return is_scheduler_running_;
96 }
97 
99  return has_refreshed_table_;
100 }
101 
103  has_refreshed_table_ = false;
104 }
105 
111 std::condition_variable ForeignTableRefreshScheduler::wait_condition_;
112 } // namespace foreign_storage
#define LOG(tag)
Definition: Logger.h:205
void refresh_foreign_table(Catalog_Namespace::Catalog &catalog, const std::string &table_name, const bool evict_cached_entries)
static void invalidateCaches()
static void setWaitDuration(int64_t duration_in_seconds)
static SysCatalog & instance()
Definition: SysCatalog.h:326
static void start(std::atomic< bool > &is_program_running)
static std::shared_ptr< MutexType > getMutex(const LockType lockType, const KeyType &key)
Definition: LegacyLockMgr.h:51