OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
SystemTableConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.concurrent.CyclicBarrier;
25 
27  final static Logger logger = LoggerFactory.getLogger(SystemTableConcurrencyTest.class);
28 
29  public static void main(String[] args) throws Exception {
31  test.testConcurrency();
32  }
33 
34  public void testConcurrency() throws Exception {
35  logger.info("SystemTableConcurrencyTest()");
36  runTest();
37  logger.info("SystemTableConcurrencyTest() done");
38  }
39 
40  private void runTest() throws Exception {
41  List<ThreadDbQueries> queriesPerThread = new ArrayList<ThreadDbQueries>(Arrays.asList(
42  new ThreadDbQueries("heavyai",
43  Arrays.asList("CREATE USER user_1 (password = 'HyperInteractive');",
44  "ALTER USER user_1 (password = 'HyperInteractive2');",
45  "DROP USER user_1;",
46  "CREATE USER user_2 (password = 'HyperInteractive');",
47  "ALTER USER user_2 (password = 'HyperInteractive2');",
48  "DROP USER user_2;")),
49  new ThreadDbQueries("heavyai",
50  Arrays.asList("CREATE USER user_3 (password = 'HyperInteractive');",
51  "GRANT SELECT ON DATABASE heavyai TO user_3;",
52  "REVOKE SELECT ON DATABASE heavyai FROM user_3;",
53  "GRANT CREATE ON DATABASE heavyai TO user_3;",
54  "REVOKE CREATE ON DATABASE heavyai FROM user_3;",
55  "DROP USER user_3;")),
56  new ThreadDbQueries("heavyai",
57  Arrays.asList("CREATE DATABASE db_1;",
58  "CREATE DATABASE db_2;",
59  "DROP DATABASE db_1;",
60  "DROP DATABASE db_2;")),
61  new ThreadDbQueries("heavyai",
62  Arrays.asList("CREATE ROLE role_1;",
63  "CREATE ROLE role_2;",
64  "DROP ROLE role_1;",
65  "DROP ROLE role_2;")),
66  new ThreadDbQueries("heavyai",
67  Arrays.asList("CREATE TABLE table_1 (i INTEGER, t TEXT);",
68  "INSERT INTO table_1 VALUES (1, 'abc');",
69  "SELECT AVG(i) FROM table_1;",
70  "CREATE VIEW view_1 AS SELECT * FROM table_1;",
71  "SELECT * FROM view_1;",
72  "DROP VIEW view_1;",
73  "DROP TABLE table_1;")),
74  new ThreadDbQueries("heavyai",
75  Arrays.asList("CREATE USER user_4 (password = 'HyperInteractive');",
76  "CREATE USER user_5 (password = 'HyperInteractive');",
77  "CREATE ROLE role_3;",
78  "CREATE ROLE role_4;",
79  "GRANT role_3, role_4 TO user_4, user_5;",
80  "REVOKE role_3 FROM user_5;",
81  "REVOKE role_4 FROM user_4, user_5;",
82  "REVOKE role_3 FROM user_4;",
83  "DROP USER user_4;",
84  "DROP USER user_5;",
85  "DROP ROLE role_3;",
86  "DROP ROLE role_4;"))));
87  final List<String> systemTableQueries = Arrays.asList("SELECT * FROM users;",
88  "SELECT * FROM permissions;",
89  "SELECT * FROM databases;",
90  "SELECT * FROM roles;",
91  "SELECT * FROM tables;",
92  "SELECT * FROM role_assignments;",
93  "SELECT * FROM dashboards;",
94  "SELECT * FROM memory_summary;",
95  "SELECT * FROM memory_details;",
96  "SELECT * FROM storage_details;");
97 
98  for (int i = 0; i < systemTableQueries.size(); i++) {
99  // Run queries for the same system table in parallel.
100  final int parallelQueryCount = 5;
101  for (int j = 0; j < parallelQueryCount; j++) {
102  queriesPerThread.add(new ThreadDbQueries(
103  "information_schema", Arrays.asList(systemTableQueries.get(i))));
104  }
105  }
106 
107  final int num_threads = queriesPerThread.size()
108  + 1; // +1 for dashboard creation/update thread, which is created separately.
109  Exception[] exceptions = new Exception[num_threads];
110  Thread[] threads = new Thread[num_threads];
111 
112  // Use a barrier here to synchronize the start of each competing thread and make
113  // sure there is a race condition. The barrier ensures no thread will run until they
114  // are all created and waiting for the barrier to complete.
115  final CyclicBarrier barrier = new CyclicBarrier(
116  num_threads, () -> { logger.info("Barrier acquired. Starting queries..."); });
117 
118  threads[0] = new Thread(() -> {
119  try {
120  logger.info("Starting thread[0]");
121  HeavyDBTestClient user = HeavyDBTestClient.getClient(
122  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
123  barrier.await();
124  logger.info("0 create dashboard \"dashboard_1\"");
125  int dashboardId = user.create_dashboard("dashboard_1");
126  logger.info("0 get dashboard " + dashboardId);
127  user.get_dashboard(dashboardId);
128  logger.info("0 replace dashboard " + dashboardId);
129  user.replace_dashboard(dashboardId, "dashboard_2", "admin");
130  logger.info("0 delete dashboard " + dashboardId);
131  user.delete_dashboard(dashboardId);
132  logger.info("Finished thread[0]");
133  } catch (Exception e) {
134  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
135  exceptions[0] = e;
136  }
137  });
138  threads[0].start();
139 
140  for (int i = 0; i < queriesPerThread.size(); i++) {
141  final ThreadDbQueries threadQueries = queriesPerThread.get(i);
142  final int threadId = i + 1;
143  threads[threadId] = new Thread(() -> {
144  try {
145  logger.info("Starting thread[" + threadId + "]");
146  HeavyDBTestClient user = HeavyDBTestClient.getClient(
147  "localhost", 6274, threadQueries.database, "admin", "HyperInteractive");
148  barrier.await();
149  for (final String query : threadQueries.queries) {
150  runSqlAsUser(query, user, threadId);
151  }
152  logger.info("Finished thread[" + threadId + "]");
153  } catch (Exception e) {
154  logger.error("Thread[" + threadId + "] Caught Exception: " + e.getMessage(), e);
155  exceptions[threadId] = e;
156  }
157  });
158  threads[threadId].start();
159  }
160 
161  for (Thread t : threads) {
162  t.join();
163  }
164 
165  for (Exception e : exceptions) {
166  if (e != null) {
167  logger.error("Exception: " + e.getMessage(), e);
168  throw e;
169  }
170  }
171  }
172 
173  private void runSqlAsUser(String sql, HeavyDBTestClient user, int threadId)
174  throws Exception {
175  logger.info(threadId + " " + sql);
176  user.runSql(sql);
177  }
178 
179  private class ThreadDbQueries {
180  public ThreadDbQueries(final String database, final List<String> queries) {
181  this.database = database;
182  this.queries = queries;
183  }
184 
185  public final String database;
186  public final List<String> queries;
187  }
188 }
ThreadDbQueries(final String database, final List< String > queries)
void runSqlAsUser(String sql, HeavyDBTestClient user, int threadId)