OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LockMgrImpl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <type_traits>
24 
25 #include "Catalog/Catalog.h"
30 #include "Shared/types.h"
31 
32 namespace lockmgr {
33 
34 class TableSchemaLockMgr;
35 class TableDataLockMgr;
36 class InsertDataLockMgr;
37 
39 
41  public:
42  MutexTracker(std::unique_ptr<heavyai::DistributedSharedMutex> dmutex)
43  : ref_count_(0u), dmutex_(std::move(dmutex)) {}
44 
45  virtual void lock() {
46  ref_count_.fetch_add(1u);
47  if (!g_multi_instance) {
48  mutex_.lock();
49  } else {
50  dmutex_->lock();
51  }
52  }
53  virtual bool try_lock() {
54  bool gotlock{false};
55  if (!g_multi_instance) {
56  gotlock = mutex_.try_lock();
57  } else {
58  gotlock = dmutex_->try_lock();
59  }
60  if (gotlock) {
61  ref_count_.fetch_add(1u);
62  }
63  return gotlock;
64  }
65  virtual void unlock() {
66  if (!g_multi_instance) {
67  mutex_.unlock();
68  } else {
69  dmutex_->unlock();
70  }
71  ref_count_.fetch_sub(1u);
72  }
73 
74  virtual void lock_shared() {
75  ref_count_.fetch_add(1u);
76  if (!g_multi_instance) {
77  mutex_.lock_shared();
78  } else {
79  dmutex_->lock_shared();
80  }
81  }
82  virtual bool try_lock_shared() {
83  bool gotlock{false};
84  if (!g_multi_instance) {
85  gotlock = mutex_.try_lock_shared();
86  } else {
87  gotlock = dmutex_->try_lock_shared();
88  }
89  if (gotlock) {
90  ref_count_.fetch_add(1u);
91  }
92  return gotlock;
93  }
94  virtual void unlock_shared() {
95  if (!g_multi_instance) {
96  mutex_.unlock_shared();
97  } else {
98  dmutex_->unlock_shared();
99  }
100  ref_count_.fetch_sub(1u);
101  }
102 
103  virtual bool isAcquired() const { return ref_count_.load() > 0; }
104 
105  private:
106  std::atomic<size_t> ref_count_;
108  std::unique_ptr<heavyai::DistributedSharedMutex> dmutex_;
109 };
110 
113 
114 template <typename LOCK>
116  static_assert(std::is_same_v<LOCK, ReadLockBase> ||
117  std::is_same_v<LOCK, WriteLockBase>);
118 
119  public:
121 
123  : mutex_(other.mutex_), lock_(std::move(other.lock_)) {
124  other.mutex_ = nullptr;
125  }
126 
127  TrackedRefLock(const TrackedRefLock&) = delete;
128  TrackedRefLock& operator=(const TrackedRefLock&) = delete;
129 
130  private:
132  LOCK lock_;
133 
135  CHECK(m);
136  return m;
137  }
138 };
139 
142 
143 template <typename T>
145  public:
146  virtual T operator()() const = 0;
147 
149 };
150 
151 template <typename T, typename LOCK>
153  public:
154  T operator()() const final { return obj_; }
155 
156  protected:
157  LockContainerImpl(T obj, LOCK&& lock) : obj_(obj), lock_(std::move(lock)) {}
158 
160  LOCK lock_;
161 };
162 
163 namespace helpers {
165  const std::string& tableName);
166 
167 template <typename LOCK_TYPE, typename LOCK_MGR_TYPE>
168 LOCK_TYPE getLockForKeyImpl(const ChunkKey& chunk_key) {
169  auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
170  return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
171 }
172 
173 template <typename LOCK_TYPE, typename LOCK_MGR_TYPE>
175  const std::string& table_name) {
176  const auto chunk_key = chunk_key_for_table(cat, table_name);
177 
178  auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
179  return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
180 }
181 
182 } // namespace helpers
183 
184 template <class T>
185 class TableLockMgrImpl {
186  static_assert(std::is_same_v<T, TableSchemaLockMgr> ||
187  std::is_same_v<T, TableDataLockMgr> ||
188  std::is_same_v<T, InsertDataLockMgr>);
189 
190  public:
191  static T& instance() {
192  static T mgr;
193  return mgr;
194  }
195  virtual ~TableLockMgrImpl() = default;
196 
197  virtual MutexTracker* getTableMutex(const ChunkKey table_key) {
198  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
199  auto mutex_it = table_mutex_map_.find(table_key);
200  if (mutex_it != table_mutex_map_.end()) {
201  return mutex_it->second.get();
202  }
203 
204  // NOTE(sy): Only used by --multi-instance clusters.
205  std::unique_ptr<heavyai::DistributedSharedMutex> dmutex =
206  getClusterTableMutex(table_key);
207 
208  return table_mutex_map_
209  .emplace(table_key, std::make_unique<MutexTracker>(std::move(dmutex)))
210  .first->second.get();
211  }
212 
213  std::set<ChunkKey> getLockedTables() const {
214  std::set<ChunkKey> ret;
215  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
216  for (const auto& kv : table_mutex_map_) {
217  if (kv.second->isAcquired()) {
218  ret.insert(kv.first);
219  }
220  }
221 
222  return ret;
223  }
224 
226  const std::string& table_name) {
227  if (const auto tdp = cat.getMetadataForTable(table_name, false)) {
228  ChunkKey chunk_key{cat.getCurrentDB().dbId, tdp->tableId};
229  auto& table_lock_mgr = T::instance();
230  MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
231  CHECK(tracker);
232  return WriteLock(tracker);
233  } else {
234  throw std::runtime_error("Table/View " + table_name + " for catalog " +
235  cat.getCurrentDB().dbName + " does not exist");
236  }
237  }
238  static WriteLock getWriteLockForTable(const ChunkKey table_key) {
239  auto& table_lock_mgr = T::instance();
240  return WriteLock(table_lock_mgr.getTableMutex(table_key));
241  }
242 
244  const std::string& table_name) {
245  if (const auto tdp = cat.getMetadataForTable(table_name, false)) {
246  ChunkKey chunk_key{cat.getCurrentDB().dbId, tdp->tableId};
247  auto& table_lock_mgr = T::instance();
248  MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
249  CHECK(tracker);
250  return ReadLock(tracker);
251  } else {
252  throw std::runtime_error("Table/View " + table_name + " for catalog " +
253  cat.getCurrentDB().dbName + " does not exist");
254  }
255  }
256  static ReadLock getReadLockForTable(const ChunkKey table_key) {
257  auto& table_lock_mgr = T::instance();
258  return ReadLock(table_lock_mgr.getTableMutex(table_key));
259  }
260 
261  protected:
263 
264  virtual std::unique_ptr<heavyai::DistributedSharedMutex> getClusterTableMutex(
265  const ChunkKey table_key) {
266  std::unique_ptr<heavyai::DistributedSharedMutex> table_mutex;
267 
268  std::string table_key_as_text;
269  for (auto n : table_key) {
270  table_key_as_text += (!table_key_as_text.empty() ? "_" : "") + std::to_string(n);
271  }
272 
273  // A callback used for syncing with most of the changed Catalog metadata, in-general,
274  // such as the list of tables that exist, dashboards, etc. This is accomplished by
275  // read locking, and immediately unlocking, dcatalogMutex_, so
276  // cat->reloadCatalogMetadataUnlocked() will be called.
277  auto cb_reload_catalog_metadata = [table_key](bool write) {
278  if constexpr (T::kind == "insert") {
279  CHECK(write); // The insert lock is for writing, never for reading.
280  }
282  table_key[CHUNK_KEY_DB_IDX]);
283  CHECK(cat);
285  *cat->dcatalogMutex_);
286  };
287 
288  if constexpr (T::kind == "schema") {
289  // A callback used for reloading the Catalog schema for the one table being locked.
290  auto cb_reload_table_metadata = [table_key, table_key_as_text](size_t version) {
291  VLOG(2) << "reloading table metadata for: table_" << table_key_as_text;
292  CHECK_EQ(table_key.size(), 2U);
294  table_key[CHUNK_KEY_DB_IDX]);
295  CHECK(cat);
296  std::unique_lock<heavyai::DistributedSharedMutex> dwrite_lock(
297  *cat->dcatalogMutex_);
298  cat->reloadTableMetadataUnlocked(table_key[CHUNK_KEY_TABLE_IDX]);
299  };
300 
301  // Create the table mutex.
303  /*pre_lock_callback=*/cb_reload_catalog_metadata,
304  /*reload_cache_callback=*/cb_reload_table_metadata};
305  auto schema_lockfile{
306  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
308  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
309  table_mutex = std::make_unique<heavyai::DistributedSharedMutex>(
310  schema_lockfile.string(), cbs);
311  } else if constexpr (T::kind == "data" || T::kind == "insert") {
312  // A callback used for reloading the DataMgr data for the one table being locked.
313  auto cb_reload_table_data = [table_key, table_key_as_text](size_t version) {
314  VLOG(2) << "invalidating table caches for new version " << version
315  << " of: table_" << table_key_as_text;
316  CHECK_EQ(table_key.size(), 2U);
318  table_key[CHUNK_KEY_DB_IDX]);
319  CHECK(cat);
320  cat->invalidateCachesForTable(table_key[CHUNK_KEY_TABLE_IDX]);
321  };
322 
323  // Create the rows mutex.
324  auto rows_lockfile{std::filesystem::path(g_base_path) /
326  ("table_" + table_key_as_text + ".rows.lockfile")};
327  std::shared_ptr<heavyai::DistributedSharedMutex> rows_mutex =
328  std::make_shared<heavyai::DistributedSharedMutex>(
329  rows_lockfile.string(),
330  /*reload_cache_callback=*/cb_reload_table_data);
331 
332  // A callback used for syncing with outside changes to this table's row data.
333  auto cb_reload_row_data = [table_key, rows_mutex](bool /*write*/) {
334  std::shared_lock rows_read_lock(*rows_mutex);
335  };
336 
337  // A callback to notify other nodes about our changes to this table's row data.
338  auto cb_notify_about_row_data = [table_key, rows_mutex](bool write) {
339  if (write) {
340  std::unique_lock rows_write_lock(*rows_mutex);
341  }
342  };
343 
344  // Create the table mutex.
346  /*pre_lock_callback=*/cb_reload_catalog_metadata,
347  {},
348  /*post_lock_callback=*/cb_reload_row_data,
349  /*pre_unlock_callback=*/cb_notify_about_row_data};
350  auto table_lockfile{
351  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
353  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
354  table_mutex =
355  std::make_unique<heavyai::DistributedSharedMutex>(table_lockfile.string(), cbs);
356  } else {
357  UNREACHABLE() << "unexpected lockmgr kind: " << T::kind;
358  }
359 
360  return table_mutex;
361  }
362 
363  mutable std::mutex map_mutex_;
364  std::map<ChunkKey, std::unique_ptr<MutexTracker>> table_mutex_map_;
365 };
366 
367 template <typename T>
368 std::ostream& operator<<(std::ostream& os, const TableLockMgrImpl<T>& lock_mgr) {
369  for (const auto& table_key : lock_mgr.getLockedTables()) {
370  for (const auto& k : table_key) {
371  os << k << " ";
372  }
373  os << "\n";
374  }
375  return os;
376 }
377 
378 } // namespace lockmgr
MutexTypeBase mutex_
Definition: LockMgrImpl.h:107
virtual bool try_lock_shared()
Definition: LockMgrImpl.h:82
virtual bool try_lock()
Definition: LockMgrImpl.h:53
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< int > ChunkKey
Definition: types.h:36
const std::string kDataDirectoryName
static WriteLock getWriteLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:225
bool g_multi_instance
Definition: heavyai_locks.h:21
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
heavyai::unique_lock< MutexTracker > WriteLockBase
Definition: LockMgrImpl.h:111
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define UNREACHABLE()
Definition: Logger.h:266
std::set< ChunkKey > getLockedTables() const
Definition: LockMgrImpl.h:213
static WriteLock getWriteLockForTable(const ChunkKey table_key)
Definition: LockMgrImpl.h:238
LOCK_TYPE getLockForKeyImpl(const ChunkKey &chunk_key)
Definition: LockMgrImpl.h:168
virtual std::unique_ptr< heavyai::DistributedSharedMutex > getClusterTableMutex(const ChunkKey table_key)
Definition: LockMgrImpl.h:264
std::string to_string(char const *&&v)
T operator()() const final
Definition: LockMgrImpl.h:154
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
heavyai::shared_mutex MutexTypeBase
Definition: LockMgrImpl.h:38
std::shared_lock< T > shared_lock
virtual MutexTracker * getTableMutex(const ChunkKey table_key)
Definition: LockMgrImpl.h:197
This file contains the class specification and related data structures for Catalog.
TrackedRefLock< WriteLockBase > WriteLock
Definition: LockMgrImpl.h:140
TrackedRefLock< ReadLockBase > ReadLock
Definition: LockMgrImpl.h:141
static SysCatalog & instance()
Definition: SysCatalog.h:337
virtual void lock()
Definition: LockMgrImpl.h:45
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:242
std::string g_base_path
Definition: SysCatalog.cpp:62
virtual T operator()() const =0
string version
Definition: setup.in.py:73
std::map< ChunkKey, std::unique_ptr< MutexTracker > > table_mutex_map_
Definition: LockMgrImpl.h:364
heavyai::shared_lock< MutexTracker > ReadLockBase
Definition: LockMgrImpl.h:112
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
static ReadLock getReadLockForTable(const ChunkKey table_key)
Definition: LockMgrImpl.h:256
virtual void lock_shared()
Definition: LockMgrImpl.h:74
MutexTracker(std::unique_ptr< heavyai::DistributedSharedMutex > dmutex)
Definition: LockMgrImpl.h:42
ChunkKey chunk_key_for_table(const Catalog_Namespace::Catalog &cat, const std::string &tableName)
Definition: LockMgr.cpp:32
TrackedRefLock(MutexTracker *m)
Definition: LockMgrImpl.h:120
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
LOCK_TYPE getLockForTableImpl(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:174
std::unique_ptr< heavyai::DistributedSharedMutex > dmutex_
Definition: LockMgrImpl.h:108
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgrImpl.h:243
const std::string kCatalogDirectoryName
#define CHECK(condition)
Definition: Logger.h:222
virtual void unlock()
Definition: LockMgrImpl.h:65
virtual void unlock_shared()
Definition: LockMgrImpl.h:94
MutexTracker * mutex_
Definition: LockMgrImpl.h:131
const std::string kLockfilesDirectoryName
TrackedRefLock(TrackedRefLock &&other)
Definition: LockMgrImpl.h:122
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
std::atomic< size_t > ref_count_
Definition: LockMgrImpl.h:106
constexpr double n
Definition: Utm.h:38
std::shared_timed_mutex shared_mutex
TrackedRefLock & operator=(const TrackedRefLock &)=delete
static MutexTracker * checkPointer(MutexTracker *m)
Definition: LockMgrImpl.h:134
virtual bool isAcquired() const
Definition: LockMgrImpl.h:103
#define VLOG(n)
Definition: Logger.h:316
LockContainerImpl(T obj, LOCK &&lock)
Definition: LockMgrImpl.h:157
virtual ~TableLockMgrImpl()=default