OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LockMgrImpl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <type_traits>
24 
25 #include "Catalog/Catalog.h"
30 #include "Shared/types.h"
31 
32 namespace lockmgr {
33 
34 class TableSchemaLockMgr;
35 class TableDataLockMgr;
36 class InsertDataLockMgr;
37 
39 
41  public:
42  MutexTracker(std::unique_ptr<heavyai::DistributedSharedMutex> dmutex)
43  : ref_count_(0u), dmutex_(std::move(dmutex)) {}
44 
45  virtual void lock() {
46  ref_count_.fetch_add(1u);
47  if (!g_multi_instance) {
48  mutex_.lock();
49  } else {
50  dmutex_->lock();
51  }
52  }
53  virtual bool try_lock() {
54  bool gotlock{false};
55  if (!g_multi_instance) {
56  gotlock = mutex_.try_lock();
57  } else {
58  gotlock = dmutex_->try_lock();
59  }
60  if (gotlock) {
61  ref_count_.fetch_add(1u);
62  }
63  return gotlock;
64  }
65  virtual void unlock() {
66  if (!g_multi_instance) {
67  mutex_.unlock();
68  } else {
69  dmutex_->unlock();
70  }
71  ref_count_.fetch_sub(1u);
72  }
73 
74  virtual void lock_shared() {
75  ref_count_.fetch_add(1u);
76  if (!g_multi_instance) {
77  mutex_.lock_shared();
78  } else {
79  dmutex_->lock_shared();
80  }
81  }
82  virtual bool try_lock_shared() {
83  bool gotlock{false};
84  if (!g_multi_instance) {
85  gotlock = mutex_.try_lock_shared();
86  } else {
87  gotlock = dmutex_->try_lock_shared();
88  }
89  if (gotlock) {
90  ref_count_.fetch_add(1u);
91  }
92  return gotlock;
93  }
94  virtual void unlock_shared() {
95  if (!g_multi_instance) {
96  mutex_.unlock_shared();
97  } else {
98  dmutex_->unlock_shared();
99  }
100  ref_count_.fetch_sub(1u);
101  }
102 
103  virtual bool isAcquired() const { return ref_count_.load() > 0; }
104 
105  private:
106  std::atomic<size_t> ref_count_;
108  std::unique_ptr<heavyai::DistributedSharedMutex> dmutex_;
109 };
110 
113 
114 template <typename LOCK>
116  static_assert(std::is_same_v<LOCK, ReadLockBase> ||
117  std::is_same_v<LOCK, WriteLockBase>);
118 
119  public:
121 
123  : mutex_(other.mutex_), lock_(std::move(other.lock_)) {
124  other.mutex_ = nullptr;
125  }
126 
127  TrackedRefLock(const TrackedRefLock&) = delete;
128  TrackedRefLock& operator=(const TrackedRefLock&) = delete;
129 
130  private:
132  LOCK lock_;
133 
135  CHECK(m);
136  return m;
137  }
138 };
139 
142 
143 template <typename T>
145  public:
146  virtual T operator()() const = 0;
147 
149 };
150 
151 template <typename T, typename LOCK>
153  public:
154  T operator()() const final { return obj_; }
155 
156  protected:
157  LockContainerImpl(T obj, LOCK&& lock) : obj_(obj), lock_(std::move(lock)) {}
158 
160  LOCK lock_;
161 };
162 
163 namespace helpers {
165  const std::string& tableName);
166 
167 template <typename LOCK_TYPE, typename LOCK_MGR_TYPE>
168 LOCK_TYPE getLockForKeyImpl(const ChunkKey& chunk_key) {
169  auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
170  return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
171 }
172 
173 template <typename LOCK_TYPE, typename LOCK_MGR_TYPE>
175  const std::string& table_name) {
176  const auto chunk_key = chunk_key_for_table(cat, table_name);
177 
178  auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
179  return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
180 }
181 
182 } // namespace helpers
183 
184 template <class T>
185 class TableLockMgrImpl {
186  static_assert(std::is_same_v<T, TableSchemaLockMgr> ||
187  std::is_same_v<T, TableDataLockMgr> ||
188  std::is_same_v<T, InsertDataLockMgr>);
189 
190  public:
191  static T& instance() {
192  static T mgr;
193  return mgr;
194  }
195  virtual ~TableLockMgrImpl() = default;
196 
197  virtual MutexTracker* getTableMutex(const ChunkKey table_key) {
198  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
199  auto mutex_it = table_mutex_map_.find(table_key);
200  if (mutex_it != table_mutex_map_.end()) {
201  return mutex_it->second.get();
202  }
203 
204  // NOTE(sy): Only used by --multi-instance clusters.
205  std::unique_ptr<heavyai::DistributedSharedMutex> dmutex =
206  getClusterTableMutex(table_key);
207 
208  return table_mutex_map_
209  .emplace(table_key, std::make_unique<MutexTracker>(std::move(dmutex)))
210  .first->second.get();
211  }
212 
213  std::set<ChunkKey> getLockedTables() const {
214  std::set<ChunkKey> ret;
215  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
216  for (const auto& kv : table_mutex_map_) {
217  if (kv.second->isAcquired()) {
218  ret.insert(kv.first);
219  }
220  }
221 
222  return ret;
223  }
224 
226  const std::string& table_name) {
227  auto lock = WriteLock(getMutexTracker(cat, table_name));
228  // Ensure table still exists after lock is acquired.
229  validateExistingTable(cat, table_name);
230  return lock;
231  }
232 
233  static WriteLock getWriteLockForTable(const ChunkKey table_key) {
234  auto& table_lock_mgr = T::instance();
235  return WriteLock(table_lock_mgr.getTableMutex(table_key));
236  }
237 
239  const std::string& table_name) {
240  auto lock = ReadLock(getMutexTracker(cat, table_name));
241  // Ensure table still exists after lock is acquired.
242  validateAndGetExistingTableId(cat, table_name);
243  return lock;
244  }
245 
246  static ReadLock getReadLockForTable(const ChunkKey table_key) {
247  auto& table_lock_mgr = T::instance();
248  return ReadLock(table_lock_mgr.getTableMutex(table_key));
249  }
250 
251  protected:
253 
254  virtual std::unique_ptr<heavyai::DistributedSharedMutex> getClusterTableMutex(
255  const ChunkKey table_key) {
256  std::unique_ptr<heavyai::DistributedSharedMutex> table_mutex;
257 
258  std::string table_key_as_text;
259  for (auto n : table_key) {
260  table_key_as_text += (!table_key_as_text.empty() ? "_" : "") + std::to_string(n);
261  }
262 
263  // A callback used for syncing with most of the changed Catalog metadata, in-general,
264  // such as the list of tables that exist, dashboards, etc. This is accomplished by
265  // read locking, and immediately unlocking, dcatalogMutex_, so
266  // cat->reloadCatalogMetadataUnlocked() will be called.
267  auto cb_reload_catalog_metadata = [table_key](bool write) {
268  if constexpr (T::kind == "insert") {
269  CHECK(write); // The insert lock is for writing, never for reading.
270  }
272  table_key[CHUNK_KEY_DB_IDX]);
273  CHECK(cat);
275  *cat->dcatalogMutex_);
276  };
277 
278  if constexpr (T::kind == "schema") {
279  // A callback used for reloading the Catalog schema for the one table being locked.
280  auto cb_reload_table_metadata = [table_key, table_key_as_text](size_t version) {
281  VLOG(2) << "reloading table metadata for: table_" << table_key_as_text;
282  CHECK_EQ(table_key.size(), 2U);
284  table_key[CHUNK_KEY_DB_IDX]);
285  CHECK(cat);
287  *cat->dcatalogMutex_);
288  cat->reloadTableMetadataUnlocked(table_key[CHUNK_KEY_TABLE_IDX]);
289  };
290 
291  // Create the table mutex.
293  /*pre_lock_callback=*/cb_reload_catalog_metadata,
294  /*reload_cache_callback=*/cb_reload_table_metadata};
295  auto schema_lockfile{
296  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
298  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
299  table_mutex = std::make_unique<heavyai::DistributedSharedMutex>(
300  schema_lockfile.string(), cbs);
301  } else if constexpr (T::kind == "data" || T::kind == "insert") {
302  // A callback used for reloading the DataMgr data for the one table being locked.
303  auto cb_reload_table_data = [table_key, table_key_as_text](size_t version) {
304  VLOG(2) << "invalidating table caches for new version " << version
305  << " of: table_" << table_key_as_text;
306  CHECK_EQ(table_key.size(), 2U);
308  table_key[CHUNK_KEY_DB_IDX]);
309  CHECK(cat);
310  cat->invalidateCachesForTable(table_key[CHUNK_KEY_TABLE_IDX]);
311  };
312 
313  // Create the rows mutex.
314  auto rows_lockfile{std::filesystem::path(g_base_path) /
316  ("table_" + table_key_as_text + ".rows.lockfile")};
317  std::shared_ptr<heavyai::DistributedSharedMutex> rows_mutex =
318  std::make_shared<heavyai::DistributedSharedMutex>(
319  rows_lockfile.string(),
320  /*reload_cache_callback=*/cb_reload_table_data);
321 
322  // A callback used for syncing with outside changes to this table's row data.
323  auto cb_reload_row_data = [table_key, rows_mutex](bool /*write*/) {
324  heavyai::shared_lock<heavyai::DistributedSharedMutex> rows_read_lock(*rows_mutex);
325  };
326 
327  // A callback to notify other nodes about our changes to this table's row data.
328  auto cb_notify_about_row_data = [table_key, rows_mutex](bool write) {
329  if (write) {
331  *rows_mutex);
332  }
333  };
334 
335  // Create the table mutex.
337  /*pre_lock_callback=*/cb_reload_catalog_metadata,
338  {},
339  /*post_lock_callback=*/cb_reload_row_data,
340  /*pre_unlock_callback=*/cb_notify_about_row_data};
341  auto table_lockfile{
342  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
344  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
345  table_mutex =
346  std::make_unique<heavyai::DistributedSharedMutex>(table_lockfile.string(), cbs);
347  } else {
348  UNREACHABLE() << "unexpected lockmgr kind: " << T::kind;
349  }
350 
351  return table_mutex;
352  }
353 
354  mutable std::mutex map_mutex_;
355  std::map<ChunkKey, std::unique_ptr<MutexTracker>> table_mutex_map_;
356 
357  private:
359  const std::string& table_name) {
360  ChunkKey chunk_key{catalog.getDatabaseId(),
361  validateAndGetExistingTableId(catalog, table_name)};
362  auto& table_lock_mgr = T::instance();
363  MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
364  CHECK(tracker);
365  return tracker;
366  }
367 
369  const std::string& table_name) {
370  validateAndGetExistingTableId(catalog, table_name);
371  }
372 
374  const std::string& table_name) {
375  auto table_id = catalog.getTableId(table_name);
376  if (!table_id.has_value()) {
377  throw std::runtime_error("Table/View " + table_name + " for catalog " +
378  catalog.name() + " does not exist");
379  }
380  return table_id.value();
381  }
382 };
383 
384 template <typename T>
385 std::ostream& operator<<(std::ostream& os, const TableLockMgrImpl<T>& lock_mgr) {
386  for (const auto& table_key : lock_mgr.getLockedTables()) {
387  for (const auto& k : table_key) {
388  os << k << " ";
389  }
390  os << "\n";
391  }
392  return os;
393 }
394 
395 } // namespace lockmgr
MutexTypeBase mutex_
Definition: LockMgrImpl.h:107
virtual bool try_lock_shared()
Definition: LockMgrImpl.h:82
virtual bool try_lock()
Definition: LockMgrImpl.h:53
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
const std::string kDataDirectoryName
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
bool g_multi_instance
Definition: heavyai_locks.h:21
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
static void validateExistingTable(Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgrImpl.h:368
heavyai::unique_lock< MutexTracker > WriteLockBase
Definition: LockMgrImpl.h:111
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define UNREACHABLE()
Definition: Logger.h:266
std::set< ChunkKey > getLockedTables() const
Definition: LockMgrImpl.h:213
static WriteLock getWriteLockForTable(const ChunkKey table_key)
Definition: LockMgrImpl.h:233
std::optional< int32_t > getTableId(const std::string &table_name) const
Definition: Catalog.cpp:1934
LOCK_TYPE getLockForKeyImpl(const ChunkKey &chunk_key)
Definition: LockMgrImpl.h:168
virtual std::unique_ptr< heavyai::DistributedSharedMutex > getClusterTableMutex(const ChunkKey table_key)
Definition: LockMgrImpl.h:254
std::string to_string(char const *&&v)
T operator()() const final
Definition: LockMgrImpl.h:154
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
heavyai::shared_mutex MutexTypeBase
Definition: LockMgrImpl.h:38
std::shared_lock< T > shared_lock
virtual MutexTracker * getTableMutex(const ChunkKey table_key)
Definition: LockMgrImpl.h:197
std::string name() const
Definition: Catalog.h:315
This file contains the class specification and related data structures for Catalog.
TrackedRefLock< WriteLockBase > WriteLock
Definition: LockMgrImpl.h:140
TrackedRefLock< ReadLockBase > ReadLock
Definition: LockMgrImpl.h:141
static SysCatalog & instance()
Definition: SysCatalog.h:341
virtual void lock()
Definition: LockMgrImpl.h:45
std::string g_base_path
Definition: SysCatalog.cpp:62
virtual T operator()() const =0
string version
Definition: setup.in.py:73
std::map< ChunkKey, std::unique_ptr< MutexTracker > > table_mutex_map_
Definition: LockMgrImpl.h:355
heavyai::shared_lock< MutexTracker > ReadLockBase
Definition: LockMgrImpl.h:112
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
int getDatabaseId() const
Definition: Catalog.h:298
static ReadLock getReadLockForTable(const ChunkKey table_key)
Definition: LockMgrImpl.h:246
virtual void lock_shared()
Definition: LockMgrImpl.h:74
MutexTracker(std::unique_ptr< heavyai::DistributedSharedMutex > dmutex)
Definition: LockMgrImpl.h:42
static MutexTracker * getMutexTracker(Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgrImpl.h:358
ChunkKey chunk_key_for_table(const Catalog_Namespace::Catalog &cat, const std::string &tableName)
Definition: LockMgr.cpp:32
TrackedRefLock(MutexTracker *m)
Definition: LockMgrImpl.h:120
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
LOCK_TYPE getLockForTableImpl(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:174
std::unique_ptr< heavyai::DistributedSharedMutex > dmutex_
Definition: LockMgrImpl.h:108
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:238
const std::string kCatalogDirectoryName
static int32_t validateAndGetExistingTableId(Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgrImpl.h:373
#define CHECK(condition)
Definition: Logger.h:222
virtual void unlock()
Definition: LockMgrImpl.h:65
virtual void unlock_shared()
Definition: LockMgrImpl.h:94
MutexTracker * mutex_
Definition: LockMgrImpl.h:131
const std::string kLockfilesDirectoryName
TrackedRefLock(TrackedRefLock &&other)
Definition: LockMgrImpl.h:122
std::atomic< size_t > ref_count_
Definition: LockMgrImpl.h:106
constexpr double n
Definition: Utm.h:38
std::shared_timed_mutex shared_mutex
TrackedRefLock & operator=(const TrackedRefLock &)=delete
static MutexTracker * checkPointer(MutexTracker *m)
Definition: LockMgrImpl.h:134
virtual bool isAcquired() const
Definition: LockMgrImpl.h:103
#define VLOG(n)
Definition: Logger.h:316
LockContainerImpl(T obj, LOCK &&lock)
Definition: LockMgrImpl.h:157
virtual ~TableLockMgrImpl()=default