OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Random;
23 import java.util.concurrent.CyclicBarrier;
24 
26  final static Logger logger =
27  LoggerFactory.getLogger(ForeignStorageConcurrencyTest.class);
28 
29  public static void main(String[] args) throws Exception {
31  test.testConcurrency();
32  }
33 
34  // This test creates 4 threads that all perform a series of common projections on a
35  // table while attempting to modify different columns at different points
36  // simultaneously. The test should resolve without exception as each thread waits on
37  // another to finish the alteration until it can complete it's own function.
38  private void runTest(String db,
39  String adminName,
40  String adminPassword,
41  String userName,
42  String userPassword) throws Exception {
43  final int num_threads = 4;
44  ArrayList<Exception> exceptions = new ArrayList<Exception>();
45 
46  // We use a barrier here to synchronize the start of each competing thread and make
47  // sure there is a race condition. The barrier ensures no thread will run until they
48  // are all created and waiting for the barrier to complete.
49  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
50  public void run() {
51  try {
52  logger.info("Barrier acquired.");
53  MapdTestClient dba = MapdTestClient.getClient(
54  "localhost", 6274, db, adminName, adminPassword);
55 
56  dba.runSql("CREATE SERVER test_server "
57  + "FOREIGN DATA WRAPPER omnisci_csv WITH (storage_type = 'LOCAL_FILE', "
58  + "base_path = '" + System.getProperty("user.dir") + "');");
59 
60  dba.runSql("CREATE FOREIGN TABLE test_table "
61  + "(b BOOLEAN, t TINYINT, s SMALLINT, i INTEGER, bi BIGINT, f FLOAT, "
62  + "dc DECIMAL(10, 5), tm TIME, tp TIMESTAMP, d DATE, txt TEXT, "
63  + "txt_2 TEXT ENCODING NONE) "
64  + "SERVER test_server WITH "
65  + "(file_path = '../Tests/FsiDataFiles/scalar_types.csv', "
66  + "FRAGMENT_SIZE = 10);");
67  logger.info("Barrier released.");
68  } catch (Exception e) {
69  logger.error("Barrier Caught Exception: " + e.getMessage(), e);
70  exceptions.add(e);
71  }
72  }
73  });
74 
75  // Each thread performs the same projections with an alter command targeting a
76  // different column injected at a different point.
77  Thread[] threads = new Thread[num_threads];
78 
79  threads[0] = new Thread(new Runnable() {
80  @Override
81  public void run() {
82  try {
83  logger.info("Starting thread[0]");
84  MapdTestClient user =
85  MapdTestClient.getClient("localhost", 6274, db, userName, userPassword);
86  barrier.await();
88  "ALTER FOREIGN TABLE test_table RENAME COLUMN t TO tint", user, "0");
89  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "0");
90  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "0");
91  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "0");
92  logger.info("Finished thread[0]");
93  } catch (Exception e) {
94  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
95  exceptions.add(e);
96  }
97  }
98  });
99  threads[0].start();
100 
101  threads[1] = new Thread(new Runnable() {
102  @Override
103  public void run() {
104  try {
105  logger.info("Starting thread[1]");
106  MapdTestClient user =
107  MapdTestClient.getClient("localhost", 6274, db, userName, userPassword);
108  barrier.await();
109  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "1");
110  runSqlAsUser(
111  "ALTER FOREIGN TABLE test_table RENAME COLUMN s TO sint", user, "1");
112  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "1");
113  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "1");
114  logger.info("Finished thread[1]");
115  } catch (Exception e) {
116  logger.error("Thread[1] Caught Exception: " + e.getMessage(), e);
117  exceptions.add(e);
118  }
119  }
120  });
121  threads[1].start();
122 
123  threads[2] = new Thread(new Runnable() {
124  @Override
125  public void run() {
126  try {
127  logger.info("Starting thread[2]");
128  MapdTestClient user =
129  MapdTestClient.getClient("localhost", 6274, db, userName, userPassword);
130  barrier.await();
131  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "2");
132  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "2");
133  runSqlAsUser(
134  "ALTER FOREIGN TABLE test_table RENAME COLUMN i TO iint", user, "2");
135  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "2");
136  logger.info("Finished thread[2]");
137  } catch (Exception e) {
138  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
139  exceptions.add(e);
140  }
141  }
142  });
143  threads[2].start();
144 
145  threads[3] = new Thread(new Runnable() {
146  @Override
147  public void run() {
148  try {
149  logger.info("Starting thread[3]");
150  MapdTestClient user =
151  MapdTestClient.getClient("localhost", 6274, db, userName, userPassword);
152  barrier.await();
153  runSqlAsUser("SELECT * FROM test_table LIMIT 2", user, "3");
154  runSqlAsUser("SELECT * FROM test_table WHERE txt = 'quoted text'", user, "3");
155  runSqlValidateAsUser("SELECT COUNT(*) FROM test_table LIMIT 2", user, "3");
156  runSqlAsUser(
157  "ALTER FOREIGN TABLE test_table RENAME COLUMN b TO bint", user, "3");
158  logger.info("Finished thread[3]");
159  } catch (Exception e) {
160  logger.error("Thread[0] Caught Exception: " + e.getMessage(), e);
161  exceptions.add(e);
162  }
163  }
164  });
165  threads[3].start();
166 
167  for (Thread t : threads) {
168  t.join();
169  }
170 
171  MapdTestClient dba =
172  MapdTestClient.getClient("localhost", 6274, db, adminName, adminPassword);
173  dba.runSql("DROP FOREIGN TABLE test_table;");
174  dba.runSql("DROP SERVER test_server;");
175 
176  for (Exception e : exceptions) {
177  if (null != e) {
178  logger.error("Exception: " + e.getMessage(), e);
179  throw e;
180  }
181  }
182  }
183 
184  public void runSqlAsUser(String sql, MapdTestClient user, String logPrefix)
185  throws Exception {
186  logger.info(logPrefix + " " + sql);
187  user.runSql(sql);
188  }
189 
190  public void runSqlValidateAsUser(String sql, MapdTestClient user, String logPrefix)
191  throws Exception {
192  logger.info(logPrefix + " " + sql);
193  user.sqlValidate(sql);
194  }
195 
196  public void testConcurrency() throws Exception {
197  logger.info("ForeignStorageConcurrencyTest()");
198 
199  MapdTestClient su = MapdTestClient.getClient(
200  "localhost", 6274, "omnisci", "admin", "HyperInteractive");
201 
202  // Initialize.
203  su.runSql("DROP DATABASE IF EXISTS db1;");
204  su.runSql("CREATE DATABASE db1;");
205 
206  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
207 
208  // Cleanup.
209  su.runSql("DROP DATABASE db1;");
210 
211  logger.info("ForeignStorageConcurrencyTest() done");
212  }
213 }
void runTest(String db, String adminName, String adminPassword, String userName, String userPassword)
void runSqlAsUser(String sql, MapdTestClient user, String logPrefix)
void runSqlValidateAsUser(String sql, MapdTestClient user, String logPrefix)
char * t
static bool run