OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
com.mapd.tests.RuntimeInterruptConcurrencyTest Class Reference
+ Collaboration diagram for com.mapd.tests.RuntimeInterruptConcurrencyTest:

Public Member Functions

void testConcurrency () throws Exception
 

Static Public Member Functions

static void main (String[] args) throws Exception
 

Static Package Attributes

static final Logger logger
 

Private Member Functions

Path getAbsolutePath (String path)
 
HeavyDBTestClient getClient (String db, String username)
 
void cleanupUserAndDB (HeavyDBTestClient su)
 
void runTest (String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword) throws Exception
 

Detailed Description

Definition at line 36 of file RuntimeInterruptConcurrencyTest.java.

Member Function Documentation

void com.mapd.tests.RuntimeInterruptConcurrencyTest.cleanupUserAndDB ( HeavyDBTestClient  su)
inlineprivate

Definition at line 60 of file RuntimeInterruptConcurrencyTest.java.

Referenced by com.mapd.tests.RuntimeInterruptConcurrencyTest.testConcurrency().

60  {
61  try {
62  su.runSql("DROP DATABASE db1;");
63  su.runSql("DROP USER u0;");
64  su.runSql("DROP USER u1;");
65  su.runSql("DROP USER u2;");
66  su.runSql("DROP USER u3;");
67  su.runSql("DROP USER u4;");
68  su.runSql("DROP USER interrupter;");
69  su.runSql("DROP TABLE IF EXISTS test_large;");
70  su.runSql("DROP TABLE IF EXISTS test_small;");
71  su.runSql("DROP TABLE IF EXISTS test_geo;");
72  } catch (Exception e) {
73  logger.error(
74  "Get exception while cleanup db, tables and users: " + e.getMessage(), e);
75  }
76  }

+ Here is the caller graph for this function:

Path com.mapd.tests.RuntimeInterruptConcurrencyTest.getAbsolutePath ( String  path)
inlineprivate

Definition at line 45 of file RuntimeInterruptConcurrencyTest.java.

Referenced by com.mapd.tests.RuntimeInterruptConcurrencyTest.runTest().

45  {
46  Path path_obj = Paths.get(path).toAbsolutePath();
47  assert Files.exists(path_obj);
48  return path_obj;
49  }

+ Here is the caller graph for this function:

HeavyDBTestClient com.mapd.tests.RuntimeInterruptConcurrencyTest.getClient ( String  db,
String  username 
)
inlineprivate

Definition at line 51 of file RuntimeInterruptConcurrencyTest.java.

Referenced by com.mapd.tests.RuntimeInterruptConcurrencyTest.runTest().

51  {
52  try {
53  return HeavyDBTestClient.getClient("localhost", 6274, db, username, "password");
54  } catch (Exception e) {
55  e.printStackTrace();
56  }
57  return null;
58  }

+ Here is the caller graph for this function:

static void com.mapd.tests.RuntimeInterruptConcurrencyTest.main ( String[]  args) throws Exception
inlinestatic

Definition at line 40 of file RuntimeInterruptConcurrencyTest.java.

40  {
41  RuntimeInterruptConcurrencyTest test = new RuntimeInterruptConcurrencyTest();
42  test.testConcurrency();
43  }
void com.mapd.tests.RuntimeInterruptConcurrencyTest.runTest ( String  db,
String  dbaUser,
String  dbaPassword,
String  dbUser,
String  dbPassword 
) throws Exception
inlineprivate

Definition at line 78 of file RuntimeInterruptConcurrencyTest.java.

References com.mapd.tests.RuntimeInterruptConcurrencyTest.getAbsolutePath(), com.mapd.tests.RuntimeInterruptConcurrencyTest.getClient(), run_benchmark_import.import_query, and run.

Referenced by com.mapd.tests.RuntimeInterruptConcurrencyTest.testConcurrency().

80  {
81  int num_threads = 5;
82  int INTERRUPTER_TID = num_threads - 1;
83  int num_runs = 5;
84  final String large_table = "test_large";
85  final String small_table = "test_small";
86  final String geo_table = "test_geo";
87  String loop_join_query =
88  "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_large T2;";
89  String hash_join_query =
90  "SELECT /*+ cpu_mode */ COUNT(1) FROM test_large T1, test_small T2 WHERE T1.x = T2.x;";
91  String gby_query =
92  "SELECT /*+ cpu_mode */ x, count(1) FROM test_large T1 GROUP BY x;";
93  Path large_table_path =
94  getAbsolutePath("../java/utility/src/main/java/com/mapd/tests/data/1M.csv");
95  Path small_table_path =
96  getAbsolutePath("../java/utility/src/main/java/com/mapd/tests/data/1K.csv");
97  Path geojson_table_path = getAbsolutePath(
98  "../java/utility/src/main/java/com/mapd/tests/data/geogdal.geojson");
99  try {
100  HeavyDBTestClient dba =
101  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
102  dba.runSql("CREATE TABLE " + large_table + "(x int not null);");
103  dba.runSql("CREATE TABLE " + small_table + "(x int not null);");
104  dba.runSql("CREATE TABLE " + geo_table
105  + "(trip DOUBLE, pt GEOMETRY(POINT, 4326) ENCODING NONE);");
106 
107  File large_data = new File(large_table_path.toString());
108  try (BufferedWriter writer = new BufferedWriter(new FileWriter(large_data))) {
109  for (int i = 0; i < 1000000; i++) {
110  writer.write(i + "\n");
111  }
112  } catch (IOException e) {
113  e.printStackTrace();
114  }
115 
116  File small_data = new File(small_table_path.toString());
117  try (BufferedWriter writer = new BufferedWriter(new FileWriter(small_data))) {
118  for (int i = 0; i < 1000; i++) {
119  writer.write(i + "\n");
120  }
121  } catch (IOException e) {
122  e.printStackTrace();
123  }
124 
125  File geojson_data = new File(geojson_table_path.toString());
126  ArrayList<String> geojson_header = new ArrayList<>();
127  ArrayList<String> geojson_footer = new ArrayList<>();
128  ArrayList<String> geojson_feature = new ArrayList<>();
129  geojson_header.add("{");
130  geojson_header.add("\"type\": \"FeatureCollection\",");
131  geojson_header.add("\"name\": \"geospatial_point\",");
132  geojson_header.add(
133  "\"crs\": { \"type\": \"name\", \"properties\": { \"name\": \"urn:ogc:def:crs:OGC:1.3:CRS84\" } },");
134  geojson_header.add("\"features\": [");
135  geojson_footer.add(
136  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 10.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 10.0, 9.0 ] } }");
137  geojson_footer.add("]");
138  geojson_footer.add("}");
139  geojson_feature.add(
140  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 0.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 0.0, 1.0 ] } },");
141  geojson_feature.add(
142  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 1.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 1.0, 2.0 ] } },");
143  geojson_feature.add(
144  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 2.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 2.0, 3.0 ] } },");
145  geojson_feature.add(
146  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 3.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 3.0, 4.0 ] } },");
147  geojson_feature.add(
148  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 4.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 4.0, 5.0 ] } },");
149  geojson_feature.add(
150  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 5.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 5.0, 6.0 ] } },");
151  geojson_feature.add(
152  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 6.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 6.0, 7.0 ] } },");
153  geojson_feature.add(
154  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 7.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 7.0, 8.0 ] } },");
155  geojson_feature.add(
156  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 8.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 8.0, 9.0 ] } },");
157  geojson_feature.add(
158  "{ \"type\": \"Feature\", \"properties\": { \"trip\": 9.0 }, \"geometry\": { \"type\": \"Point\", \"coordinates\": [ 9.0, 0.0 ] } },");
159  try (BufferedWriter writer = new BufferedWriter(new FileWriter(geojson_data))) {
160  for (String str : geojson_header) {
161  writer.write(str + "\n");
162  }
163  for (int i = 0; i < 1000; i++) {
164  for (String str : geojson_feature) {
165  writer.write(str + "\n");
166  }
167  }
168  for (String str : geojson_footer) {
169  writer.write(str + "\n");
170  }
171  } catch (IOException e) {
172  e.printStackTrace();
173  }
174 
175  dba.runSql("COPY " + large_table + " FROM '" + large_table_path.toString()
176  + "' WITH (header='false');");
177  dba.runSql("COPY " + small_table + " FROM '" + small_table_path.toString()
178  + "' WITH (header='false');");
179  dba.runSql("COPY " + geo_table + " FROM '" + geojson_table_path.toString()
180  + "' WITH (header='false', geo='true');");
181  } catch (Exception e) {
182  logger.error("[" + Thread.currentThread().getId() + "]"
183  + " Caught Exception: " + e.getMessage(),
184  e);
185  }
186 
187  ArrayList<Thread> queryThreads = new ArrayList<>();
188  ArrayList<Thread> interrupterThreads = new ArrayList<>();
189  Thread query_interrupter = new Thread(new Runnable() {
190  @Override
191  public void run() {
192  // try to interrupt
193  int tid = INTERRUPTER_TID;
194  String logPrefix = "[" + tid + "]";
195  HeavyDBTestClient interrupter = getClient(db, "interrupter");
196  int check_empty_session_queue = 0;
197  while (true) {
198  try {
199  List<TQueryInfo> queryInfos = interrupter.get_queries_info();
200  boolean found_target_query = false;
201  for (TQueryInfo queryInfo : queryInfos) {
202  String session_id = queryInfo.query_public_session_id;
203  boolean select_query =
204  queryInfo.current_status.equals("RUNNING_QUERY_KERNEL");
205  boolean import_query = queryInfo.current_status.equals("RUNNING_IMPORTER");
206  boolean can_interrupt = false;
207  if (import_query
208  || (select_query
209  && queryInfo.query_str.compareTo(loop_join_query) == 0)) {
210  can_interrupt = true;
211  }
212  if (can_interrupt) {
213  interrupter.runSql("KILL QUERY '" + session_id + "';");
214  check_empty_session_queue = 0;
215  found_target_query = true;
216  }
217  }
218  if (!found_target_query || queryInfos.isEmpty()) {
219  ++check_empty_session_queue;
220  }
221  if (check_empty_session_queue > 20) {
222  break;
223  }
224  Thread.sleep(1000);
225  } catch (Exception e) {
226  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
227  }
228  }
229  }
230  });
231  query_interrupter.start();
232  interrupterThreads.add(query_interrupter);
233 
234  for (int i = 0; i < num_runs; i++) {
235  logger.info("Starting run-" + i);
236  for (int r = 0; r < num_threads; r++) {
237  final int tid = r;
238  final String logPrefix = "[" + tid + "]";
239  final String user_name = "u".concat(Integer.toString(tid));
240  if (r < num_threads - 2) {
241  String[] queries = {hash_join_query, gby_query, loop_join_query};
242  Thread select_query_runner = new Thread(new Runnable() {
243  @Override
244  public void run() {
245  logger.info("Starting thread-" + tid);
246  final HeavyDBTestClient user = getClient(db, user_name);
247  for (int k = 0; k < 5; k++) {
248  boolean interrupted = false;
249  for (int q = 0; q < 3; q++) {
250  try {
251  logger.info(logPrefix + " Run SELECT query: " + queries[q]);
252  user.runSql(queries[q]);
253  } catch (Exception e2) {
254  if (e2 instanceof TDBException) {
255  TDBException ee = (TDBException) e2;
256  if (q == 2 && ee.error_msg.contains("ERR_INTERRUPTED")) {
257  interrupted = true;
258  logger.info(
259  logPrefix + " Select query issued has been interrupted");
260  }
261  } else {
262  logger.error(
263  logPrefix + " Caught Exception: " + e2.getMessage(), e2);
264  }
265  }
266  }
267  assert interrupted;
268  }
269  }
270  });
271  select_query_runner.start();
272  queryThreads.add(select_query_runner);
273  } else {
274  Thread import_query_runner = new Thread(new Runnable() {
275  @Override
276  public void run() {
277  logger.info("Starting thread-" + tid);
278  final HeavyDBTestClient user = getClient(db, user_name);
279  for (int k = 0; k < 2; k++) {
280  boolean interrupted = false;
281  try {
282  Path geo_table_path = getAbsolutePath(
283  "../Tests/Import/datafiles/interrupt_table_gdal.geojson");
284  user.runSql("COPY " + geo_table + " FROM '" + geo_table_path.toString()
285  + "' WITH (geo='true');");
286  logger.info(logPrefix + " Run Import query");
287  } catch (Exception e2) {
288  if (e2 instanceof TDBException) {
289  TDBException ee = (TDBException) e2;
290  if (ee.error_msg.contains("error code 10")) {
291  interrupted = true;
292  logger.info(logPrefix + " Import query has been interrupted");
293  }
294  } else {
295  logger.error(logPrefix + " Caught Exception: " + e2.getMessage(), e2);
296  }
297  }
298  assert interrupted;
299  }
300  }
301  });
302  import_query_runner.start();
303  queryThreads.add(import_query_runner);
304  }
305  }
306  }
307 
308  for (Thread t : queryThreads) {
309  t.join();
310  }
311  for (Thread t : interrupterThreads) {
312  t.join();
313  }
314 
315  HeavyDBTestClient dba =
316  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
317  dba.runSql("DROP TABLE " + large_table + ";");
318  dba.runSql("DROP TABLE " + small_table + ";");
319  dba.runSql("DROP TABLE " + geo_table + ";");
320  File large_data = new File(large_table_path.toString());
321  File small_data = new File(small_table_path.toString());
322  File geojson_data = new File(geojson_table_path.toString());
323  if (large_data.exists()) {
324  large_data.delete();
325  }
326  if (small_data.exists()) {
327  small_data.delete();
328  }
329  if (geojson_data.exists()) {
330  geojson_data.delete();
331  }
332  }
HeavyDBTestClient getClient(String db, String username)
static bool run

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void com.mapd.tests.RuntimeInterruptConcurrencyTest.testConcurrency ( ) throws Exception
inline

Definition at line 334 of file RuntimeInterruptConcurrencyTest.java.

References com.mapd.tests.RuntimeInterruptConcurrencyTest.cleanupUserAndDB(), and com.mapd.tests.RuntimeInterruptConcurrencyTest.runTest().

334  {
335  logger.info("RuntimeInterruptConcurrencyTest()");
336 
337  HeavyDBTestClient su = HeavyDBTestClient.getClient(
338  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
339  cleanupUserAndDB(su);
340  su.runSql("CREATE DATABASE db1;");
341  su.runSql("CREATE USER u0 (password = 'password', is_super = 'false');");
342  su.runSql("CREATE USER u1 (password = 'password', is_super = 'false');");
343  su.runSql("CREATE USER u2 (password = 'password', is_super = 'false');");
344  su.runSql("CREATE USER u3 (password = 'password', is_super = 'false');");
345  su.runSql("CREATE USER u4 (password = 'password', is_super = 'false');");
346  su.runSql("CREATE USER interrupter (password = 'password', is_super = 'true');");
347  su.runSql("GRANT ALL on DATABASE db1 TO u0;");
348  su.runSql("GRANT ALL on DATABASE db1 TO u1;");
349  su.runSql("GRANT ALL on DATABASE db1 TO u2;");
350  su.runSql("GRANT ALL on DATABASE db1 TO u3;");
351  su.runSql("GRANT ALL on DATABASE db1 TO u4;");
352  su.runSql("GRANT ALL on DATABASE db1 TO interrupter;");
353  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
354  cleanupUserAndDB(su);
355  logger.info("RuntimeInterruptConcurrencyTest() done");
356  }
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)

+ Here is the call graph for this function:

Member Data Documentation

final Logger com.mapd.tests.RuntimeInterruptConcurrencyTest.logger
staticpackage
Initial value:
=
LoggerFactory.getLogger(RuntimeInterruptConcurrencyTest.class)

Definition at line 37 of file RuntimeInterruptConcurrencyTest.java.


The documentation for this class was generated from the following file: