OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignTableRefreshConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.nio.file.Files;
22 import java.nio.file.Path;
23 import java.nio.file.Paths;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.Date;
28 import java.util.TimeZone;
29 
31  final static Logger logger =
32  LoggerFactory.getLogger(ForeignTableRefreshConcurrencyTest.class);
33 
34  public static String getTimeStamp(int sec_from_now) {
35  Calendar cal = Calendar.getInstance();
36  cal.setTime(new Date());
37  cal.add(Calendar.SECOND, sec_from_now);
38  SimpleDateFormat date_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
39  date_format.setTimeZone(TimeZone.getTimeZone("GMT"));
40  return date_format.format(cal.getTime());
41  }
42 
43  public static void main(String[] args) throws Exception {
45  test.testConcurrency();
46  }
47 
48  private Path getAbsolutePath(String path) {
49  Path path_obj = Paths.get(path).toAbsolutePath();
50  assert Files.exists(path_obj);
51  return path_obj;
52  }
53 
54  // This test creates mulitple competing threads, some of which run selects
55  // with joins on a table while others refresh a foreign table. The test
56  // should resolve without exception as the cache being used during the join
57  // exectution should be safely invalidated during the foreign table
58  // refreshes. Both manual and scheduled foreign table refreshes are tested
59  // as the code-paths used to invalidate the QueryEngine cache differ.
60  private void runTest(String db,
61  String userName,
62  String userPassword,
63  int num_foreign_manual_refresh_threads,
64  int num_foreign_scheduled_refresh_threads,
65  int num_table_join_threads,
66  int num_runs) throws Exception {
67  ArrayList<Exception> exceptions = new ArrayList<Exception>();
68 
69  Thread[] foreign_table_manual_refresh_threads =
70  new Thread[num_foreign_manual_refresh_threads];
71  Thread[] foreign_table_scheduled_refresh_threads =
72  new Thread[num_foreign_scheduled_refresh_threads];
73  Thread[] table_join_threads = new Thread[num_table_join_threads];
74 
75  for (int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
76  final int tid = i;
77  foreign_table_manual_refresh_threads[tid] = new Thread(new Runnable() {
78  @Override
79  public void run() {
80  final String thread_name = "F[" + tid + "]";
81  try {
82  logger.info("Starting manual foreign table refresh thread " + thread_name);
83  HeavyDBTestClient user = HeavyDBTestClient.getClient(
84  "localhost", 6274, db, userName, userPassword);
85  for (int irun = 0; irun < num_runs; ++irun) {
86  runSqlAsUser("SELECT * FROM test_foreign_table_manual_refresh_" + tid + ";",
87  user,
88  thread_name);
89  runSqlAsUser("REFRESH FOREIGN TABLES test_foreign_table_manual_refresh_"
90  + tid + ";",
91  user,
92  thread_name);
93  }
94  logger.info("Finished foreign table manual refresh " + thread_name);
95  } catch (Exception e) {
96  logger.error("Foreign table refresh " + thread_name
97  + " Caught Exception: " + e.getMessage(),
98  e);
99  exceptions.add(e);
100  }
101  }
102  });
103  foreign_table_manual_refresh_threads[tid].start();
104  }
105 
106  for (int i = 0; i < num_foreign_scheduled_refresh_threads; ++i) {
107  final int tid = i;
108  foreign_table_scheduled_refresh_threads[tid] = new Thread(new Runnable() {
109  @Override
110  public void run() {
111  final String thread_name = "S[" + tid + "]";
112  try {
113  logger.info("Starting scheduled foreign table refresh thread " + thread_name);
114  HeavyDBTestClient user = HeavyDBTestClient.getClient(
115  "localhost", 6274, db, userName, userPassword);
116  for (int irun = 0; irun < num_runs; ++irun) {
117  runSqlAsUser(
118  "SELECT * FROM test_foreign_table_scheduled_refresh_" + tid + ";",
119  user,
120  thread_name);
121  Thread.sleep(1000); // sleep 1s between queries to allow
122  // refreshes to occur, which occur at 1s intervals
123  }
124  logger.info("Finished foreign table scheduled refresh " + thread_name);
125  } catch (Exception e) {
126  logger.error("Foreign table scheduled refresh " + thread_name
127  + " Caught Exception: " + e.getMessage(),
128  e);
129  exceptions.add(e);
130  }
131  }
132  });
133  foreign_table_scheduled_refresh_threads[tid].start();
134  }
135 
136  for (int i = 0; i < num_table_join_threads; ++i) {
137  final int tid = i;
138  table_join_threads[tid] = new Thread(new Runnable() {
139  @Override
140  public void run() {
141  final String thread_name = "T[" + tid + "]";
142  try {
143  logger.info("Starting table join " + thread_name);
144  HeavyDBTestClient user = HeavyDBTestClient.getClient(
145  "localhost", 6274, db, userName, userPassword);
146  for (int irun = 0; irun < num_runs; ++irun) {
147  runSqlAsUser("SELECT * FROM test_table_" + tid
148  + "_left AS l JOIN test_table_" + tid
149  + "_right AS r ON l.id = r.id;",
150  user,
151  thread_name);
152  Thread.sleep(1000); // sleep 1s between queries to allow more
153  // contention between foreign table refreshes;
154  // especially in the case of scheduled refreshes
155  }
156  logger.info("Finished table join thread T[" + tid + "]");
157  } catch (Exception e) {
158  e.printStackTrace();
159  logger.error(
160  "Table join " + thread_name + " Caught Exception: " + e.getMessage(),
161  e);
162  exceptions.add(e);
163  }
164  }
165  });
166  table_join_threads[tid].start();
167  }
168 
169  for (Thread t : foreign_table_manual_refresh_threads) {
170  t.join();
171  }
172  for (Thread t : foreign_table_scheduled_refresh_threads) {
173  t.join();
174  }
175  for (Thread t : table_join_threads) {
176  t.join();
177  }
178 
179  for (Exception e : exceptions) {
180  if (null != e) {
181  logger.error("Exception: " + e.getMessage(), e);
182  throw e;
183  }
184  }
185  }
186 
187  public void runSqlAsUser(String sql, HeavyDBTestClient user, String logPrefix)
188  throws Exception {
189  logger.info(logPrefix + " " + sql);
190  user.runSql(sql);
191  }
192 
194  HeavyDBTestClient dba, String foreign_table_name) throws Exception {
195  dba.runSql("CREATE FOREIGN TABLE " + foreign_table_name + " "
196  + "(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
197  + "dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
198  + "txt_2 TEXT ENCODING NONE) "
199  + "SERVER test_server WITH "
200  + "(file_path = 'scalar_types.csv', "
201  + "refresh_timing_type = 'scheduled', "
202  + "refresh_start_date_time = '" + getTimeStamp(1) + "', "
203  + "refresh_interval = '1S', "
204  + "fragment_size = 2);");
205  }
206 
208  HeavyDBTestClient dba, String foreign_table_name) throws Exception {
209  dba.runSql("CREATE FOREIGN TABLE " + foreign_table_name + " "
210  + "(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
211  + "dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
212  + "txt_2 TEXT ENCODING NONE) "
213  + "SERVER test_server WITH "
214  + "(file_path = 'scalar_types.csv', "
215  + "fragment_size = 2);");
216  }
217 
218  private void createTestTable(
219  HeavyDBTestClient dba, String table_name, Path copy_from_path)
220  throws Exception {
221  dba.runSql("CREATE TABLE " + table_name
222  + " (id INTEGER, str TEXT ENCODING DICT(32), x DOUBLE, y BIGINT) WITH (FRAGMENT_SIZE=1, partitions='replicated')");
223  dba.runSql("COPY " + table_name + " FROM '" + copy_from_path.toString()
224  + "' WITH (header='false');");
225  }
226 
227  private void createTestTables(
228  HeavyDBTestClient dba, String table_name, Path copy_from_path)
229  throws Exception {
230  createTestTable(dba, table_name + "_left", copy_from_path);
231  createTestTable(dba, table_name + "_right", copy_from_path);
232  }
233 
234  public void testConcurrency() throws Exception {
235  logger.info("ForeignTableRefreshConcurrencyTest()");
236 
237  HeavyDBTestClient su = HeavyDBTestClient.getClient(
238  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
239 
240  // initialize
241  su.runSql("DROP DATABASE IF EXISTS db1;");
242  su.runSql("CREATE DATABASE db1;");
243 
244  // create tables for use
245  final int num_foreign_manual_refresh_threads = 2;
246  final int num_foreign_scheduled_refresh_threads = 2;
247  final int num_table_join_threads = 2;
248  Path table_import_path = getAbsolutePath(
249  "../java/utility/src/main/java/com/mapd/tests/data/simple_test.csv");
250  Path foreign_server_path = getAbsolutePath("../Tests/FsiDataFiles/");
251  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
252  "localhost", 6274, "db1", "admin", "HyperInteractive");
253  dba.runSql("CREATE SERVER test_server "
254  + "FOREIGN DATA WRAPPER delimited_file WITH (storage_type = 'LOCAL_FILE', "
255  + "base_path = '" + foreign_server_path.toString() + "');");
256  for (int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
257  createForeignTestTableManualRefresh(dba, "test_foreign_table_manual_refresh_" + i);
258  }
259  for (int i = 0; i < num_foreign_scheduled_refresh_threads; ++i) {
261  dba, "test_foreign_table_scheduled_refresh_" + i);
262  }
263  for (int i = 0; i < num_table_join_threads; ++i) {
264  createTestTables(dba, "test_table_" + i, table_import_path);
265  }
266 
267  runTest("db1",
268  "admin",
269  "HyperInteractive",
270  num_foreign_manual_refresh_threads,
271  num_foreign_scheduled_refresh_threads,
272  num_table_join_threads,
273  25);
274 
275  // cleanup
276  for (int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
277  dba.runSql("DROP FOREIGN TABLE test_foreign_table_manual_refresh_" + i + ";");
278  }
279  for (int i = 0; i < num_foreign_manual_refresh_threads; ++i) {
280  dba.runSql("DROP FOREIGN TABLE test_foreign_table_scheduled_refresh_" + i + ";");
281  }
282  for (int i = 0; i < num_table_join_threads; ++i) {
283  dba.runSql("DROP TABLE test_table_" + i + "_left ;");
284  dba.runSql("DROP TABLE test_table_" + i + "_right ;");
285  }
286  dba.runSql("DROP SERVER test_server;");
287  su.runSql("DROP DATABASE db1;");
288 
289  logger.info("ForeignTableRefreshConcurrencyTest() done");
290  }
291 }
void createTestTable(HeavyDBTestClient dba, String table_name, Path copy_from_path)
void runSqlAsUser(String sql, HeavyDBTestClient user, String logPrefix)
void createForeignTestTableManualRefresh(HeavyDBTestClient dba, String foreign_table_name)
void createTestTables(HeavyDBTestClient dba, String table_name, Path copy_from_path)
void runTest(String db, String userName, String userPassword, int num_foreign_manual_refresh_threads, int num_foreign_scheduled_refresh_threads, int num_table_join_threads, int num_runs)
static bool run
void createForeignTestTableScheduledRefresh(HeavyDBTestClient dba, String foreign_table_name)