OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DumpRestoreConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.io.File;
22 import java.nio.file.Files;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.CyclicBarrier;
27 
29  final static Logger logger = LoggerFactory.getLogger(DumpRestoreConcurrencyTest.class);
30 
31  public static void main(String[] args) throws Exception {
33  test.testConcurrency();
34  }
35 
36  public void testConcurrency() throws Exception {
37  logger.info("DumpRestoreConcurrencyTest()");
38  runTest();
39  logger.info("DumpRestoreConcurrencyTest() done");
40  }
41 
42  private void deleteFileIfExist(String file) {
43  try {
44  Files.deleteIfExists((new File(file)).toPath());
45  } catch (Exception e) {
46  logger.error("While Deleting Archives Caught Exception: " + e.getMessage(), e);
47  }
48  }
49 
50  // This is the file path utilized by `DumpRestoreTest`, used here for consistency
51  static String tar_ball_path_prefix = "/tmp/_Orz__";
52 
53  List<String> getDumpRestoreQueries(String table_identifier) {
54  return Arrays.asList("DROP TABLE IF EXISTS " + table_identifier + ";",
55  "DROP TABLE IF EXISTS restored_" + table_identifier + ";",
56  "CREATE TABLE " + table_identifier + " (v INT);",
57  "INSERT INTO " + table_identifier + " VALUES (1),(2);",
58  "DUMP TABLE " + table_identifier + " TO '" + tar_ball_path_prefix + "_"
59  + table_identifier + "';",
60  "RESTORE TABLE restored_" + table_identifier + " FROM '"
61  + tar_ball_path_prefix + "_" + table_identifier + "';");
62  }
63 
64  private void runTest() throws Exception {
65  List<ThreadDbDumpRestoreQueries> queriesPerThread =
66  new ArrayList<ThreadDbDumpRestoreQueries>(Arrays.asList(
77 
78  final int num_threads = queriesPerThread.size();
79  Exception[] exceptions = new Exception[num_threads];
80  Thread[] threads = new Thread[num_threads];
81 
82  // Use a barrier here to synchronize the start of each competing thread and make
83  // sure there is a race condition. The barrier ensures no thread will run until they
84  // are all created and waiting for the barrier to complete.
85  final CyclicBarrier barrier = new CyclicBarrier(
86  num_threads, () -> { logger.info("Barrier acquired. Starting queries..."); });
87 
88  for (int i = 0; i < queriesPerThread.size(); i++) {
89  final ThreadDbDumpRestoreQueries threadQueries = queriesPerThread.get(i);
90  final int threadId = i;
91  threads[threadId] = new Thread(() -> {
92  try {
93  logger.info("Starting thread[" + threadId + "]");
94  HeavyDBTestClient user = HeavyDBTestClient.getClient(
95  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
96  deleteFileIfExist(threadQueries.archive);
97  barrier.await();
98  for (final String query : threadQueries.queries) {
99  runSqlAsUser(query, user, threadId);
100  }
101  logger.info("Finished thread[" + threadId + "]");
102  } catch (Exception e) {
103  logger.error("Thread[" + threadId + "] Caught Exception: " + e.getMessage(), e);
104  exceptions[threadId] = e;
105  }
106  });
107  threads[threadId].start();
108  }
109 
110  for (Thread t : threads) {
111  t.join();
112  }
113 
114  // clean up archive files
115  for (int i = 0; i < queriesPerThread.size(); i++) {
116  final ThreadDbDumpRestoreQueries threadQueries = queriesPerThread.get(i);
117  deleteFileIfExist(threadQueries.archive);
118  }
119 
120  for (Exception e : exceptions) {
121  if (e != null) {
122  logger.error("Exception: " + e.getMessage(), e);
123  throw e;
124  }
125  }
126  }
127 
128  private void runSqlAsUser(String sql, HeavyDBTestClient user, int threadId)
129  throws Exception {
130  logger.info(threadId + " " + sql);
131  user.runSql(sql);
132  }
133 
136  final String archive_file, final List<String> queries) {
137  this.archive = archive_file;
138  this.queries = queries;
139  }
140 
141  public final String archive;
142  public final List<String> queries;
143  }
144 }
List< String > getDumpRestoreQueries(String table_identifier)
ThreadDbDumpRestoreQueries(final String archive_file, final List< String > queries)
void runSqlAsUser(String sql, HeavyDBTestClient user, int threadId)