OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignTableRefreshConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.nio.file.Files;
22 import java.nio.file.Path;
23 import java.nio.file.Paths;
24 import java.util.ArrayList;
25 
27  final static Logger logger =
28  LoggerFactory.getLogger(ForeignTableRefreshConcurrencyTest.class);
29 
30  public static void main(String[] args) throws Exception {
32  test.testConcurrency();
33  }
34 
35  private Path getAbsolutePath(String path) {
36  Path path_obj = Paths.get(path).toAbsolutePath();
37  assert Files.exists(path_obj);
38  return path_obj;
39  }
40 
41  // This test creates mulitple competing threads, some of which run selects
42  // with joins on a table while others refresh a foreign table. The test
43  // should resolve without exception as the cache being used during the join
44  // exectution should be safely invalidated during the foreign table refreshes.
45  private void runTest(String db,
46  String userName,
47  String userPassword,
48  int num_foreign_refresh_threads,
49  int num_table_join_threads,
50  int num_runs) throws Exception {
51  ArrayList<Exception> exceptions = new ArrayList<Exception>();
52 
53  Thread[] foreign_table_refresh_threads = new Thread[num_foreign_refresh_threads];
54  Thread[] table_join_threads = new Thread[num_table_join_threads];
55 
56  for (int i = 0; i < num_foreign_refresh_threads; ++i) {
57  final int tid = i;
58  foreign_table_refresh_threads[tid] = new Thread(new Runnable() {
59  @Override
60  public void run() {
61  final String thread_name = "F[" + tid + "]";
62  try {
63  logger.info("Starting foreign table refresh thread " + thread_name);
64  MapdTestClient user = MapdTestClient.getClient(
65  "localhost", 6274, db, userName, userPassword);
66  for (int irun = 0; irun < num_runs; ++irun) {
68  "SELECT * FROM test_foreign_table_" + tid + ";", user, thread_name);
69  runSqlAsUser("REFRESH FOREIGN TABLES test_foreign_table_" + tid + ";",
70  user,
71  thread_name);
72  }
73  logger.info("Finished foreign table refresh " + thread_name);
74  } catch (Exception e) {
75  logger.error("Foreign table refresh " + thread_name
76  + " Caught Exception: " + e.getMessage(),
77  e);
78  exceptions.add(e);
79  }
80  }
81  });
82  foreign_table_refresh_threads[tid].start();
83  }
84 
85  for (int i = 0; i < num_table_join_threads; ++i) {
86  final int tid = i;
87  table_join_threads[tid] = new Thread(new Runnable() {
88  @Override
89  public void run() {
90  final String thread_name = "T[" + tid + "]";
91  try {
92  logger.info("Starting table join " + thread_name);
93  MapdTestClient user = MapdTestClient.getClient(
94  "localhost", 6274, db, userName, userPassword);
95  for (int irun = 0; irun < num_runs; ++irun) {
96  runSqlAsUser("SELECT * FROM test_table_" + tid
97  + "_left AS l JOIN test_table_" + tid
98  + "_right AS r ON l.id = r.id;",
99  user,
100  thread_name);
101  Thread.sleep(200); // sleep 200 milliseconds between queries to allow more
102  // contention between foreign table refreshes
103  }
104  logger.info("Finished table join thread[0]");
105  } catch (Exception e) {
106  logger.error(
107  "Table join " + thread_name + " Caught Exception: " + e.getMessage(),
108  e);
109  exceptions.add(e);
110  }
111  }
112  });
113  table_join_threads[tid].start();
114  }
115 
116  for (Thread t : foreign_table_refresh_threads) {
117  t.join();
118  }
119  for (Thread t : table_join_threads) {
120  t.join();
121  }
122 
123  for (Exception e : exceptions) {
124  if (null != e) {
125  logger.error("Exception: " + e.getMessage(), e);
126  throw e;
127  }
128  }
129  }
130 
131  public void runSqlAsUser(String sql, MapdTestClient user, String logPrefix)
132  throws Exception {
133  logger.info(logPrefix + " " + sql);
134  user.runSql(sql);
135  }
136 
137  private void createForeignTestTable(MapdTestClient dba, String foreign_table_name)
138  throws Exception {
139  dba.runSql("CREATE FOREIGN TABLE " + foreign_table_name + " "
140  + "(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
141  + "dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
142  + "txt_2 TEXT ENCODING NONE) "
143  + "SERVER test_server WITH "
144  + "(file_path = 'scalar_types.csv', "
145  + "FRAGMENT_SIZE = 2);");
146  }
147 
148  private void createTestTable(MapdTestClient dba, String table_name, Path copy_from_path)
149  throws Exception {
150  dba.runSql("CREATE TABLE " + table_name
151  + " (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT) WITH (FRAGMENT_SIZE=1)");
152  dba.runSql("COPY " + table_name + " FROM '" + copy_from_path.toString()
153  + "' WITH (header='false');");
154  }
155 
156  private void createTestTables(
157  MapdTestClient dba, String table_name, Path copy_from_path) throws Exception {
158  createTestTable(dba, table_name + "_left", copy_from_path);
159  createTestTable(dba, table_name + "_right", copy_from_path);
160  }
161 
162  public void testConcurrency() throws Exception {
163  logger.info("ForeignTableRefreshConcurrencyTest()");
164 
165  MapdTestClient su = MapdTestClient.getClient(
166  "localhost", 6274, "omnisci", "admin", "HyperInteractive");
167 
168  // initialize
169  su.runSql("DROP DATABASE IF EXISTS db1;");
170  su.runSql("CREATE DATABASE db1;");
171 
172  // create tables for use
173  final int num_foreign_refresh_threads = 2;
174  final int num_table_join_threads = 2;
175  Path table_import_path = getAbsolutePath(
176  "../java/utility/src/main/java/com/mapd/tests/data/simple_test.csv");
177  Path foreign_server_path = getAbsolutePath("../Tests/FsiDataFiles/");
178  MapdTestClient dba = MapdTestClient.getClient(
179  "localhost", 6274, "db1", "admin", "HyperInteractive");
180  dba.runSql("CREATE SERVER test_server "
181  + "FOREIGN DATA WRAPPER omnisci_csv WITH (storage_type = 'LOCAL_FILE', "
182  + "base_path = '" + foreign_server_path.toString() + "');");
183  for (int i = 0; i < num_foreign_refresh_threads; ++i) {
184  createForeignTestTable(dba, "test_foreign_table_" + i);
185  }
186  for (int i = 0; i < num_table_join_threads; ++i) {
187  createTestTables(dba, "test_table_" + i, table_import_path);
188  }
189 
190  runTest("db1",
191  "admin",
192  "HyperInteractive",
193  num_foreign_refresh_threads,
194  num_table_join_threads,
195  25);
196 
197  // cleanup
198  for (int i = 0; i < num_foreign_refresh_threads; ++i) {
199  dba.runSql("DROP FOREIGN TABLE test_foreign_table_" + i + ";");
200  }
201  for (int i = 0; i < num_table_join_threads; ++i) {
202  dba.runSql("DROP TABLE test_table_" + i + "_left ;");
203  dba.runSql("DROP TABLE test_table_" + i + "_right ;");
204  }
205  dba.runSql("DROP SERVER test_server;");
206  su.runSql("DROP DATABASE db1;");
207 
208  logger.info("ForeignTableRefreshConcurrencyTest() done");
209  }
210 }
void createTestTable(MapdTestClient dba, String table_name, Path copy_from_path)
void createForeignTestTable(MapdTestClient dba, String foreign_table_name)
void runTest(String db, String userName, String userPassword, int num_foreign_refresh_threads, int num_table_join_threads, int num_runs)
char * t
static bool run
void runSqlAsUser(String sql, MapdTestClient user, String logPrefix)
void createTestTables(MapdTestClient dba, String table_name, Path copy_from_path)