23 #include <type_traits>
34 class TableSchemaLockMgr;
35 class TableDataLockMgr;
36 class InsertDataLockMgr;
42 MutexTracker(std::unique_ptr<heavyai::DistributedSharedMutex> dmutex)
56 gotlock =
mutex_.try_lock();
85 gotlock =
mutex_.try_lock_shared();
87 gotlock =
dmutex_->try_lock_shared();
108 std::unique_ptr<heavyai::DistributedSharedMutex>
dmutex_;
114 template <
typename LOCK>
116 static_assert(std::is_same_v<LOCK, ReadLockBase> ||
117 std::is_same_v<LOCK, WriteLockBase>);
124 other.mutex_ =
nullptr;
143 template <
typename T>
151 template <
typename T,
typename LOCK>
165 const std::string& tableName);
167 template <
typename LOCK_TYPE,
typename LOCK_MGR_TYPE>
169 auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
170 return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
173 template <
typename LOCK_TYPE,
typename LOCK_MGR_TYPE>
175 const std::string& table_name) {
178 auto& table_lock_mgr = LOCK_MGR_TYPE::instance();
179 return LOCK_TYPE(table_lock_mgr.getTableMutex(chunk_key));
185 class TableLockMgrImpl {
186 static_assert(std::is_same_v<T, TableSchemaLockMgr> ||
187 std::is_same_v<T, TableDataLockMgr> ||
188 std::is_same_v<T, InsertDataLockMgr>);
198 std::lock_guard<std::mutex> access_map_lock(
map_mutex_);
201 return mutex_it->second.get();
205 std::unique_ptr<heavyai::DistributedSharedMutex> dmutex =
209 .emplace(table_key, std::make_unique<MutexTracker>(std::move(dmutex)))
210 .first->second.get();
214 std::set<ChunkKey> ret;
215 std::lock_guard<std::mutex> access_map_lock(
map_mutex_);
217 if (kv.second->isAcquired()) {
218 ret.insert(kv.first);
226 const std::string& table_name) {
234 auto& table_lock_mgr = T::instance();
235 return WriteLock(table_lock_mgr.getTableMutex(table_key));
239 const std::string& table_name) {
247 auto& table_lock_mgr = T::instance();
248 return ReadLock(table_lock_mgr.getTableMutex(table_key));
256 std::unique_ptr<heavyai::DistributedSharedMutex> table_mutex;
258 std::string table_key_as_text;
259 for (
auto n : table_key) {
260 table_key_as_text += (!table_key_as_text.empty() ?
"_" :
"") +
std::to_string(
n);
267 auto cb_reload_catalog_metadata = [table_key](
bool write) {
268 if constexpr (T::kind ==
"insert") {
275 *
cat->dcatalogMutex_);
278 if constexpr (T::kind ==
"schema") {
280 auto cb_reload_table_metadata = [table_key, table_key_as_text](
size_t version) {
281 VLOG(2) <<
"reloading table metadata for: table_" << table_key_as_text;
287 *cat->dcatalogMutex_);
293 cb_reload_catalog_metadata,
294 cb_reload_table_metadata};
295 auto schema_lockfile{
298 (
"table_" + table_key_as_text +
"." + T::kind.data() +
".lockfile")};
299 table_mutex = std::make_unique<heavyai::DistributedSharedMutex>(
300 schema_lockfile.string(), cbs);
301 }
else if constexpr (T::kind ==
"data" || T::kind ==
"insert") {
303 auto cb_reload_table_data = [table_key, table_key_as_text](
size_t version) {
304 VLOG(2) <<
"invalidating table caches for new version " <<
version
305 <<
" of: table_" << table_key_as_text;
314 auto rows_lockfile{std::filesystem::path(
g_base_path) /
316 (
"table_" + table_key_as_text +
".rows.lockfile")};
317 std::shared_ptr<heavyai::DistributedSharedMutex> rows_mutex =
318 std::make_shared<heavyai::DistributedSharedMutex>(
319 rows_lockfile.string(),
320 cb_reload_table_data);
323 auto cb_reload_row_data = [table_key, rows_mutex](
bool ) {
328 auto cb_notify_about_row_data = [table_key, rows_mutex](
bool write) {
337 cb_reload_catalog_metadata,
340 cb_notify_about_row_data};
344 (
"table_" + table_key_as_text +
"." + T::kind.data() +
".lockfile")};
346 std::make_unique<heavyai::DistributedSharedMutex>(table_lockfile.string(), cbs);
348 UNREACHABLE() <<
"unexpected lockmgr kind: " << T::kind;
359 const std::string& table_name) {
362 auto& table_lock_mgr = T::instance();
363 MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
369 const std::string& table_name) {
374 const std::string& table_name) {
375 auto table_id = catalog.
getTableId(table_name);
376 if (!table_id.has_value()) {
377 throw std::runtime_error(
"Table/View " + table_name +
" for catalog " +
378 catalog.
name() +
" does not exist");
380 return table_id.value();
384 template <
typename T>
385 std::ostream& operator<<(std::ostream& os, const TableLockMgrImpl<T>& lock_mgr) {
386 for (
const auto& table_key : lock_mgr.getLockedTables()) {
387 for (
const auto& k : table_key) {
virtual bool try_lock_shared()
std::vector< int > ChunkKey
const std::string kDataDirectoryName
static int32_t validateAndGetExistingTableId(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
class for a per-database catalog. also includes metadata for the current database and the current use...
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
heavyai::unique_lock< MutexTracker > WriteLockBase
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
static MutexTracker * getMutexTracker(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
virtual ~AbstractLockContainer()
std::set< ChunkKey > getLockedTables() const
static WriteLock getWriteLockForTable(const ChunkKey table_key)
std::optional< int32_t > getTableId(const std::string &table_name) const
LOCK_TYPE getLockForKeyImpl(const ChunkKey &chunk_key)
virtual std::unique_ptr< heavyai::DistributedSharedMutex > getClusterTableMutex(const ChunkKey table_key)
T operator()() const final
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
heavyai::shared_mutex MutexTypeBase
std::shared_lock< T > shared_lock
virtual MutexTracker * getTableMutex(const ChunkKey table_key)
This file contains the class specification and related data structures for Catalog.
TrackedRefLock< WriteLockBase > WriteLock
TrackedRefLock< ReadLockBase > ReadLock
static SysCatalog & instance()
virtual T operator()() const =0
std::map< ChunkKey, std::unique_ptr< MutexTracker > > table_mutex_map_
heavyai::shared_lock< MutexTracker > ReadLockBase
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
int getDatabaseId() const
static ReadLock getReadLockForTable(const ChunkKey table_key)
virtual void lock_shared()
MutexTracker(std::unique_ptr< heavyai::DistributedSharedMutex > dmutex)
ChunkKey chunk_key_for_table(const Catalog_Namespace::Catalog &cat, const std::string &tableName)
TrackedRefLock(MutexTracker *m)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
LOCK_TYPE getLockForTableImpl(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
std::unique_ptr< heavyai::DistributedSharedMutex > dmutex_
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
const std::string kCatalogDirectoryName
virtual void unlock_shared()
const std::string kLockfilesDirectoryName
TrackedRefLock(TrackedRefLock &&other)
static void validateExistingTable(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
std::atomic< size_t > ref_count_
std::shared_timed_mutex shared_mutex
TrackedRefLock & operator=(const TrackedRefLock &)=delete
static MutexTracker * checkPointer(MutexTracker *m)
virtual bool isAcquired() const
LockContainerImpl(T obj, LOCK &&lock)
virtual ~TableLockMgrImpl()=default