OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Random;
23 import java.util.concurrent.CyclicBarrier;
24 
26  final static Logger logger =
27  LoggerFactory.getLogger(ForeignStorageConcurrencyTest.class);
28 
29  public static void main(String[] args) throws Exception {
31  test.testConcurrency();
32  }
33 
34  // This test creates 4 threads that all perform a series of common projections on a
35  // table while attempting to modify different columns at different points
36  // simultaneously. The test should resolve without exception as each thread waits on
37  // another to finish the alteration until it can complete it's own function.
38  private void runTest(String db,
39  String adminName,
40  String adminPassword,
41  String userName,
42  String userPassword) throws Exception {
43  final int num_threads = 4;
44  ArrayList<Exception> exceptions = new ArrayList<Exception>();
45 
46  // We use a barrier here to synchronize the start of each competing thread and make
47  // sure there is a race condition. The barrier ensures no thread will run until they
48  // are all created and waiting for the barrier to complete.
49  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
50  public void run() {
51  try {
52  logger.info("Barrier acquired.");
53  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
54  "localhost", 6274, db, adminName, adminPassword);
55 
56  dba.runSql("CREATE SERVER test_server "
57  + "FOREIGN DATA WRAPPER delimited_file WITH (storage_type = 'LOCAL_FILE', "
58  + "base_path = '" + System.getProperty("user.dir") + "');");
59 
60  dba.runSql("CREATE FOREIGN TABLE test_table "
61  + "(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
62  + "dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
63  + "txt_2 TEXT ENCODING NONE) "
64  + "SERVER test_server WITH "
65  + "(file_path = '../Tests/FsiDataFiles/scalar_types.csv', "
66  + "FRAGMENT_SIZE = 10);");
67  logger.info("Barrier released.");
68  } catch (Exception e) {
69  logger.error("Barrier Caught Exception: " + e.getMessage(), e);
70  exceptions.add(e);
71  }
72  }
73  });
74 
75  // Each thread performs the same projections with an alter command targeting a
76  // different column injected at a different point.
77  Thread[] threads = new Thread[num_threads];
78 
79  threads[0] = new Thread(new Runnable() {
80  @Override
81  public void run() {
82  try {
83  logger.info("Starting thread[0]");
84  HeavyDBTestClient user = HeavyDBTestClient.getClient(
85  "localhost", 6274, db, userName, userPassword);
86  barrier.await();
88  "ALTER FOREIGN TABLE test_table RENAME COLUMN t TO tint", user, "0");
89  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "0");
90  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "0");
91  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "0");
92  logger.info("Finished thread[0]");
93  } catch (Exception e) {
94  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
95  exceptions.add(e);
96  }
97  }
98  });
99  threads[0].start();
100 
101  threads[1] = new Thread(new Runnable() {
102  @Override
103  public void run() {
104  try {
105  logger.info("Starting thread[1]");
106  HeavyDBTestClient user = HeavyDBTestClient.getClient(
107  "localhost", 6274, db, userName, userPassword);
108  barrier.await();
109  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "1");
110  runSqlAsUser(
111  "ALTER FOREIGN TABLE test_table RENAME COLUMN s TO sint", user, "1");
112  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "1");
113  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "1");
114  logger.info("Finished thread[1]");
115  } catch (Exception e) {
116  logger.error("Thread[1] Caught Exception: " + e.getMessage(), e);
117  exceptions.add(e);
118  }
119  }
120  });
121  threads[1].start();
122 
123  threads[2] = new Thread(new Runnable() {
124  @Override
125  public void run() {
126  try {
127  logger.info("Starting thread[2]");
128  HeavyDBTestClient user = HeavyDBTestClient.getClient(
129  "localhost", 6274, db, userName, userPassword);
130  barrier.await();
131  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "2");
132  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "2");
133  runSqlAsUser(
134  "ALTER FOREIGN TABLE test_table RENAME COLUMN i TO iint", user, "2");
135  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "2");
136  logger.info("Finished thread[2]");
137  } catch (Exception e) {
138  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
139  exceptions.add(e);
140  }
141  }
142  });
143  threads[2].start();
144 
145  threads[3] = new Thread(new Runnable() {
146  @Override
147  public void run() {
148  try {
149  logger.info("Starting thread[3]");
150  HeavyDBTestClient user = HeavyDBTestClient.getClient(
151  "localhost", 6274, db, userName, userPassword);
152  barrier.await();
153  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "3");
154  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "3");
155  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "3");
156  runSqlAsUser(
157  "ALTER FOREIGN TABLE test_table RENAME COLUMN b TO bint", user, "3");
158  logger.info("Finished thread[3]");
159  } catch (Exception e) {
160  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
161  exceptions.add(e);
162  }
163  }
164  });
165  threads[3].start();
166 
167  for (Thread t : threads) {
168  t.join();
169  }
170 
171  HeavyDBTestClient dba =
172  HeavyDBTestClient.getClient("localhost", 6274, db, adminName, adminPassword);
173  dba.runSql("DROP FOREIGN TABLE test_table;");
174  dba.runSql("DROP SERVER test_server;");
175 
176  for (Exception e : exceptions) {
177  if (null != e) {
178  logger.error("Exception: " + e.getMessage(), e);
179  throw e;
180  }
181  }
182  }
183 
184  public void runSqlAsUser(String sql, HeavyDBTestClient user, String logPrefix)
185  throws Exception {
186  logger.info(logPrefix + " " + sql);
187  user.runSql(sql);
188  }
189 
190  public void runSqlValidateAsUser(String sql, HeavyDBTestClient user, String logPrefix)
191  throws Exception {
192  logger.info(logPrefix + " " + sql);
193  user.sqlValidate(sql);
194  }
195 
196  public void testConcurrency() throws Exception {
197  logger.info("ForeignStorageConcurrencyTest()");
198 
199  HeavyDBTestClient su = HeavyDBTestClient.getClient(
200  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
201 
202  // Initialize.
203  su.runSql("DROP DATABASE IF EXISTS db1;");
204  su.runSql("CREATE DATABASE db1;");
205 
206  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
207 
208  // Cleanup.
209  su.runSql("DROP DATABASE db1;");
210 
211  logger.info("ForeignStorageConcurrencyTest() done");
212  }
213 }
void runTest(String db, String adminName, String adminPassword, String userName, String userPassword)
void runSqlValidateAsUser(String sql, HeavyDBTestClient user, String logPrefix)
void runSqlAsUser(String sql, HeavyDBTestClient user, String logPrefix)
static bool run