OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RuntimeInterruptConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
22 import java.io.BufferedWriter;
23 import java.io.File;
24 import java.io.FileWriter;
25 import java.io.IOException;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.util.ArrayList;
30 import java.util.HashSet;
31 import java.util.List;
32 
33 import ai.heavy.thrift.server.TDBException;
34 import ai.heavy.thrift.server.TQueryInfo;
35 
37  final static Logger logger =
38  LoggerFactory.getLogger(RuntimeInterruptConcurrencyTest.class);
39 
40  public static void main(String[] args) throws Exception {
42  test.testConcurrency();
43  }
44 
45  private Path getAbsolutePath(String path) {
46  Path path_obj = Paths.get(path).toAbsolutePath();
47  assert Files.exists(path_obj);
48  return path_obj;
49  }
50 
51  private HeavyDBTestClient getClient(String db, String username) {
52  try {
53  return HeavyDBTestClient.getClient("localhost", 6274, db, username, "password");
54  } catch (Exception e) {
55  e.printStackTrace();
56  }
57  return null;
58  }
59 
61  try {
62  su.runSql("DROP DATABASE db1;");
63  su.runSql("DROP USER u0;");
64  su.runSql("DROP USER u1;");
65  su.runSql("DROP USER u2;");
66  su.runSql("DROP USER u3;");
67  su.runSql("DROP USER u4;");
68  su.runSql("DROP USER interrupter;");
69  su.runSql("DROP TABLE IF EXISTS test_large;");
70  su.runSql("DROP TABLE IF EXISTS test_small;");
71  su.runSql("DROP TABLE IF EXISTS test_geo;");
72  } catch (Exception e) {
73  logger.error(
74  "Get exception while cleanup db, tables and users: " + e.getMessage(), e);
75  }
76  }
77 
78  private void runTest(
79  String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
80  throws Exception {
81  int num_threads = 5;
82  int INTERRUPTER_TID = num_threads - 1;
83  int num_runs = 5;
84  final String large_table = "test_large";
85  final String small_table = "test_small";
86  final String geo_table = "test_geo";
87  String loop_join_query =
88  "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_large T2;";
89  String hash_join_query =
90  "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_small T2 WHERE T1.x = T2.x;";
91  String gby_query =
92  "SELECT /*+ cpu_mode */ x, count(1) FROM test_large T1 GROUP BY x;";
93  Path large_table_path =
94  getAbsolutePath("../java/utility/src/main/java/com/mapd/tests/data/1M.csv");
95  Path small_table_path =
96  getAbsolutePath("../java/utility/src/main/java/com/mapd/tests/data/1K.csv");
97  Path geojson_table_path = getAbsolutePath(
98  "../java/utility/src/main/java/com/mapd/tests/data/geogdal.geojson");
99  try {
100  HeavyDBTestClient dba =
101  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
102  dba.runSql("CREATE TABLE " + large_table + "(x int not null);");
103  dba.runSql("CREATE TABLE " + small_table + "(x int not null);");
104  dba.runSql("CREATE TABLE " + geo_table
105  + "(trip DOUBLE, pt GEOMETRY(POINT, 4326) ENCODING NONE);");
106 
107  File large_data = new File(large_table_path.toString());
108  try (BufferedWriter writer = new BufferedWriter(new FileWriter(large_data))) {
109  for (int i = 0; i < 1000000; i++) {
110  writer.write(i + "\n");
111  }
112  } catch (IOException e) {
113  e.printStackTrace();
114  }
115 
116  File small_data = new File(small_table_path.toString());
117  try (BufferedWriter writer = new BufferedWriter(new FileWriter(small_data))) {
118  for (int i = 0; i < 1000; i++) {
119  writer.write(i + "\n");
120  }
121  } catch (IOException e) {
122  e.printStackTrace();
123  }
124 
125  File geojson_data = new File(geojson_table_path.toString());
126  ArrayList<String> geojson_header = new ArrayList<>();
127  ArrayList<String> geojson_footer = new ArrayList<>();
128  ArrayList<String> geojson_feature = new ArrayList<>();
129  geojson_header.add("{");
130  geojson_header.add("\"type\": \"FeatureCollection\",");
131  geojson_header.add("\"name\": \"geospatial_point\",");
132  geojson_header.add(
133  "\"crs\": { \"type\": \"name\", \"properties\": { \"name\": \"urn:ogc:def:crs:OGC:1.3:CRS84\" } },");
134  geojson_header.add("\"features\": [");
135  geojson_footer.add(
136  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 10.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 10.0, 9.0 ] } }");
137  geojson_footer.add("]");
138  geojson_footer.add("}");
139  geojson_feature.add(
140  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 0.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 0.0, 1.0 ] } },");
141  geojson_feature.add(
142  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 1.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 1.0, 2.0 ] } },");
143  geojson_feature.add(
144  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 2.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 2.0, 3.0 ] } },");
145  geojson_feature.add(
146  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 3.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 3.0, 4.0 ] } },");
147  geojson_feature.add(
148  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 4.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 4.0, 5.0 ] } },");
149  geojson_feature.add(
150  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 5.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 5.0, 6.0 ] } },");
151  geojson_feature.add(
152  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 6.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 6.0, 7.0 ] } },");
153  geojson_feature.add(
154  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 7.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 7.0, 8.0 ] } },");
155  geojson_feature.add(
156  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 8.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 8.0, 9.0 ] } },");
157  geojson_feature.add(
158  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 9.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 9.0, 0.0 ] } },");
159  try (BufferedWriter writer = new BufferedWriter(new FileWriter(geojson_data))) {
160  for (String str : geojson_header) {
161  writer.write(str + "\n");
162  }
163  for (int i = 0; i < 1000; i++) {
164  for (String str : geojson_feature) {
165  writer.write(str + "\n");
166  }
167  }
168  for (String str : geojson_footer) {
169  writer.write(str + "\n");
170  }
171  } catch (IOException e) {
172  e.printStackTrace();
173  }
174 
175  dba.runSql("COPY " + large_table + " FROM '" + large_table_path.toString()
176  + "' WITH (header='false');");
177  dba.runSql("COPY " + small_table + " FROM '" + small_table_path.toString()
178  + "' WITH (header='false');");
179  dba.runSql("COPY " + geo_table + " FROM '" + geojson_table_path.toString()
180  + "' WITH (header='false', geo='true');");
181  } catch (Exception e) {
182  logger.error("[" + Thread.currentThread().getId() + "]"
183  + " Caught Exception: " + e.getMessage(),
184  e);
185  }
186 
187  ArrayList<Thread> queryThreads = new ArrayList<>();
188  ArrayList<Thread> interrupterThreads = new ArrayList<>();
189  Thread query_interrupter = new Thread(new Runnable() {
190  @Override
191  public void run() {
192  // try to interrupt
193  int tid = INTERRUPTER_TID;
194  String logPrefix = "[" + tid + "]";
195  HeavyDBTestClient interrupter = getClient(db, "interrupter");
196  int check_empty_session_queue = 0;
197  while (true) {
198  try {
199  List<TQueryInfo> queryInfos = interrupter.get_queries_info();
200  boolean found_target_query = false;
201  for (TQueryInfo queryInfo : queryInfos) {
202  String session_id = queryInfo.query_public_session_id;
203  boolean select_query =
204  queryInfo.current_status.equals("RUNNING_QUERY_KERNEL");
205  boolean import_query = queryInfo.current_status.equals("RUNNING_IMPORTER");
206  boolean can_interrupt = false;
207  if (import_query
208  || (select_query
209  && queryInfo.query_str.compareTo(loop_join_query) == 0)) {
210  can_interrupt = true;
211  }
212  if (can_interrupt) {
213  interrupter.runSql("KILL QUERY '" + session_id + "';");
214  check_empty_session_queue = 0;
215  found_target_query = true;
216  }
217  }
218  if (!found_target_query || queryInfos.isEmpty()) {
219  ++check_empty_session_queue;
220  }
221  if (check_empty_session_queue > 20) {
222  break;
223  }
224  Thread.sleep(1000);
225  } catch (Exception e) {
226  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
227  }
228  }
229  }
230  });
231  query_interrupter.start();
232  interrupterThreads.add(query_interrupter);
233 
234  for (int i = 0; i < num_runs; i++) {
235  logger.info("Starting run-" + i);
236  for (int r = 0; r < num_threads; r++) {
237  final int tid = r;
238  final String logPrefix = "[" + tid + "]";
239  final String user_name = "u".concat(Integer.toString(tid));
240  if (r < num_threads - 2) {
241  String[] queries = {hash_join_query, gby_query, loop_join_query};
242  Thread select_query_runner = new Thread(new Runnable() {
243  @Override
244  public void run() {
245  logger.info("Starting thread-" + tid);
246  final HeavyDBTestClient user = getClient(db, user_name);
247  for (int k = 0; k < 5; k++) {
248  boolean interrupted = false;
249  for (int q = 0; q < 3; q++) {
250  try {
251  logger.info(logPrefix + " Run SELECT query: " + queries[q]);
252  user.runSql(queries[q]);
253  } catch (Exception e2) {
254  if (e2 instanceof TDBException) {
255  TDBException ee = (TDBException) e2;
256  if (q == 2 && ee.error_msg.contains("ERR_INTERRUPTED")) {
257  interrupted = true;
258  logger.info(
259  logPrefix + " Select query issued has been interrupted");
260  }
261  } else {
262  logger.error(
263  logPrefix + " Caught Exception: " + e2.getMessage(), e2);
264  }
265  }
266  }
267  assert interrupted;
268  }
269  }
270  });
271  select_query_runner.start();
272  queryThreads.add(select_query_runner);
273  } else {
274  Thread import_query_runner = new Thread(new Runnable() {
275  @Override
276  public void run() {
277  logger.info("Starting thread-" + tid);
278  final HeavyDBTestClient user = getClient(db, user_name);
279  for (int k = 0; k < 2; k++) {
280  boolean interrupted = false;
281  try {
282  Path geo_table_path = getAbsolutePath(
283  "../Tests/Import/datafiles/interrupt_table_gdal.geojson");
284  user.runSql("COPY " + geo_table + " FROM '" + geo_table_path.toString()
285  + "' WITH (geo='true');");
286  logger.info(logPrefix + " Run Import query");
287  } catch (Exception e2) {
288  if (e2 instanceof TDBException) {
289  TDBException ee = (TDBException) e2;
290  if (ee.error_msg.contains("error code 10")) {
291  interrupted = true;
292  logger.info(logPrefix + " Import query has been interrupted");
293  }
294  } else {
295  logger.error(logPrefix + " Caught Exception: " + e2.getMessage(), e2);
296  }
297  }
298  assert interrupted;
299  }
300  }
301  });
302  import_query_runner.start();
303  queryThreads.add(import_query_runner);
304  }
305  }
306  }
307 
308  for (Thread t : queryThreads) {
309  t.join();
310  }
311  for (Thread t : interrupterThreads) {
312  t.join();
313  }
314 
315  HeavyDBTestClient dba =
316  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
317  dba.runSql("DROP TABLE " + large_table + ";");
318  dba.runSql("DROP TABLE " + small_table + ";");
319  dba.runSql("DROP TABLE " + geo_table + ";");
320  File large_data = new File(large_table_path.toString());
321  File small_data = new File(small_table_path.toString());
322  File geojson_data = new File(geojson_table_path.toString());
323  if (large_data.exists()) {
324  large_data.delete();
325  }
326  if (small_data.exists()) {
327  small_data.delete();
328  }
329  if (geojson_data.exists()) {
330  geojson_data.delete();
331  }
332  }
333 
334  public void testConcurrency() throws Exception {
335  logger.info("RuntimeInterruptConcurrencyTest()");
336 
337  HeavyDBTestClient su = HeavyDBTestClient.getClient(
338  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
339  cleanupUserAndDB(su);
340  su.runSql("CREATE DATABASE db1;");
341  su.runSql("CREATE USER u0 (password = 'password', is_super = 'false');");
342  su.runSql("CREATE USER u1 (password = 'password', is_super = 'false');");
343  su.runSql("CREATE USER u2 (password = 'password', is_super = 'false');");
344  su.runSql("CREATE USER u3 (password = 'password', is_super = 'false');");
345  su.runSql("CREATE USER u4 (password = 'password', is_super = 'false');");
346  su.runSql("CREATE USER interrupter (password = 'password', is_super = 'true');");
347  su.runSql("GRANT ALL on DATABASE db1 TO u0;");
348  su.runSql("GRANT ALL on DATABASE db1 TO u1;");
349  su.runSql("GRANT ALL on DATABASE db1 TO u2;");
350  su.runSql("GRANT ALL on DATABASE db1 TO u3;");
351  su.runSql("GRANT ALL on DATABASE db1 TO u4;");
352  su.runSql("GRANT ALL on DATABASE db1 TO interrupter;");
353  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
354  cleanupUserAndDB(su);
355  logger.info("RuntimeInterruptConcurrencyTest() done");
356  }
357 }
constexpr double A
Definition: Utm.h:45
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
HeavyDBTestClient getClient(String db, String username)
static bool run