OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignTableRefresh.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignTableRefresh.h"
18 #include "Catalog/Catalog.h"
20 #include "LockMgr/LockMgr.h"
21 #include "Shared/distributed.h"
22 #include "Shared/misc.h"
23 
24 #include "QueryEngine/Execute.h"
25 
26 namespace foreign_storage {
27 namespace {
29  const ChunkKey& key_prefix) {
30  data_mgr.deleteChunksWithPrefix(key_prefix, MemoryLevel::CPU_LEVEL);
31  data_mgr.deleteChunksWithPrefix(key_prefix, MemoryLevel::GPU_LEVEL);
32 }
33 } // namespace
34 
36  const ForeignTable& td,
37  const bool evict_cached_entries) {
38  LOG(INFO) << "Starting refresh for table: " << td.tableName;
39  auto& data_mgr = catalog.getDataMgr();
40  ChunkKey table_key{catalog.getCurrentDB().dbId, td.tableId};
41  Executor::clearExternalCaches(true, &td, catalog.getDatabaseId());
43 
44  auto fsm = data_mgr.getPersistentStorageMgr()->getForeignStorageMgr();
45  CHECK(fsm);
46  if (auto cfm = dynamic_cast<CachingForeignStorageMgr*>(fsm)) {
47  if (!cfm->hasStoredDataWrapper(table_key[CHUNK_KEY_DB_IDX],
48  table_key[CHUNK_KEY_TABLE_IDX])) {
49  // If there is no wrapper stored on disk, then we have not populated the metadata
50  // for this table and we are free to skip the refresh.
52  return;
53  }
54  }
55 
56  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>> old_chunk_metadata_by_chunk_key;
57  if (td.isAppendMode() && !evict_cached_entries) {
58  ChunkMetadataVector metadata_vec;
59  data_mgr.getChunkMetadataVecForKeyPrefix(metadata_vec, table_key);
60  int last_fragment_id = 0;
61  for (const auto& [key, metadata] : metadata_vec) {
62  if (key[CHUNK_KEY_FRAGMENT_IDX] > last_fragment_id) {
63  last_fragment_id = key[CHUNK_KEY_FRAGMENT_IDX];
64  }
65  old_chunk_metadata_by_chunk_key[key] = metadata;
66  }
67  for (const auto& [key, metadata] : metadata_vec) {
68  if (key[CHUNK_KEY_FRAGMENT_IDX] == last_fragment_id) {
69  clear_cpu_and_gpu_cache(data_mgr, key);
70  }
71  }
72  } else {
73  clear_cpu_and_gpu_cache(data_mgr, table_key);
74  }
75 
76  try {
77  fsm->refreshTable(table_key, evict_cached_entries);
79  } catch (PostEvictionRefreshException& e) {
81  clear_cpu_and_gpu_cache(data_mgr, table_key);
82  throw e.getOriginalException();
83  } catch (...) {
84  clear_cpu_and_gpu_cache(data_mgr, table_key);
85  throw;
86  }
87 
88  // Delete cached rolled off/updated chunks.
89  if (!old_chunk_metadata_by_chunk_key.empty()) {
90  ChunkMetadataVector new_metadata_vec;
91  data_mgr.getChunkMetadataVecForKeyPrefix(new_metadata_vec, table_key);
92  for (const auto& [key, metadata] : new_metadata_vec) {
93  auto it = old_chunk_metadata_by_chunk_key.find(key);
94  if (it != old_chunk_metadata_by_chunk_key.end() &&
95  it->second->numElements != metadata->numElements) {
96  clear_cpu_and_gpu_cache(data_mgr, key);
97  }
98  }
99  }
100  LOG(INFO) << "Completed refresh for table: " << td.tableName;
101 }
102 
104  const std::string& table_name,
105  const bool evict_cached_entries) {
106  if (dist::is_leaf_node() &&
108  // Skip aggregator only system tables on leaf nodes.
109  return;
110  }
111  auto table_lock =
112  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
114  catalog, table_name, false));
115 
116  const TableDescriptor* td = (*table_lock)();
117  if (td->storageType != StorageType::FOREIGN_TABLE) {
118  throw std::runtime_error{
119  table_name +
120  " is not a foreign table. Refreshes are applicable to only foreign tables."};
121  }
122 
123  auto foreign_table = dynamic_cast<const ForeignTable*>(td);
124  CHECK(foreign_table);
125  refresh_foreign_table_unlocked(catalog, *foreign_table, evict_cached_entries);
126 }
127 } // namespace foreign_storage
bool contains(const T &container, const U &element)
Definition: misc.h:195
std::vector< int > ChunkKey
Definition: types.h:36
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
std::string tableName
Data_Namespace::DataMgr & getDataMgr() const
Definition: Catalog.h:266
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void refresh_foreign_table(Catalog_Namespace::Catalog &catalog, const std::string &table_name, const bool evict_cached_entries)
std::runtime_error getOriginalException()
This file contains the class specification and related data structures for Catalog.
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
int getDatabaseId() const
Definition: Catalog.h:326
bool isAppendMode() const
Checks if the table is in append mode.
void deleteChunksWithPrefix(const ChunkKey &keyPrefix)
Definition: DataMgr.cpp:522
void removeFragmenterForTable(const int table_id) const
Definition: Catalog.cpp:4260
void clear_cpu_and_gpu_cache(Data_Namespace::DataMgr &data_mgr, const ChunkKey &key_prefix)
#define CHECK(condition)
Definition: Logger.h:291
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Definition: Execute.h:438
static const std::array< std::string, 4 > kAggregatorOnlySystemTables
Definition: Catalog.h:131
void refresh_foreign_table_unlocked(Catalog_Namespace::Catalog &catalog, const ForeignTable &td, const bool evict_cached_entries)
static constexpr char const * FOREIGN_TABLE
void updateForeignTableRefreshTimes(const int32_t table_id)
Definition: Catalog.cpp:5805