OmniSciDB  2e3a973ef4
CtasItasSelectUpdelConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import com.omnisci.thrift.server.TOmniSciException;
19 
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 
23 import java.util.ArrayList;
24 import java.util.Random;
25 import java.util.concurrent.CyclicBarrier;
26 
28  final static Logger logger =
29  LoggerFactory.getLogger(CtasItasSelectUpdelConcurrencyTest.class);
30 
31  final static String[] text_values = {"foo",
32  "bar",
33  "hello",
34  "world",
35  "a",
36  "b",
37  "c",
38  "d",
39  "e",
40  "f",
41  "g",
42  "h",
43  "i",
44  "j",
45  "k",
46  "l",
47  "m",
48  "n",
49  "o",
50  "p"};
51 
52  public static void main(String[] args) throws Exception {
54  test.testConcurrency();
55  }
56 
57  private void runTest(
58  String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
59  throws Exception {
60  int num_threads = 5;
61  final int runs = 25;
62  final int num_rows = 1000;
63  final int fragment_size = 10;
64  final String tableName = "test";
65  Exception exceptions[] = new Exception[num_threads];
66 
67  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
68  public void run() {
69  try {
70  MapdTestClient dba =
71  MapdTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
72  dba.runSql("CREATE TABLE " + tableName
73  + "(x BIGINT, y INTEGER, z SMALLINT, a TINYINT, f FLOAT, d DOUBLE, deci DECIMAL(18,6), str TEXT ENCODING NONE) WITH (FRAGMENT_SIZE = "
74  + fragment_size + ")");
75 
76  for (int i = 0; i < num_rows; i++) {
77  final String integer_val = Integer.toString(i);
78  final String small_val = Integer.toString(i % 128);
79  final String fp_val = Double.toString(i * 1.1);
80  final String deci_val = Double.toString(i + 0.01);
81  final String str_val = "'" + text_values[i % text_values.length] + "'";
82  final String values_string = String.join(" , ",
83  integer_val,
84  integer_val,
85  small_val,
86  small_val,
87  fp_val,
88  fp_val,
89  deci_val,
90  str_val);
91  dba.runSql("INSERT INTO " + tableName + " VALUES "
92  + "(" + values_string + ")");
93  }
94 
95  } catch (Exception e) {
96  logger.error("[" + Thread.currentThread().getId() + "]"
97  + " Caught Exception: " + e.getMessage(),
98  e);
99  exceptions[0] = e;
100  }
101  }
102  });
103 
104  ArrayList<Thread> threads = new ArrayList<>();
105  for (int i = 0; i < num_threads; i++) {
106  logger.info("Starting " + i);
107  final int threadId = i;
108 
109  Thread t = new Thread(new Runnable() {
110  @Override
111  public void run() {
112  long tid = Thread.currentThread().getId();
113  String logPrefix = "[" + tid + "]";
114  String sql = "";
115 
116  final String ctasTableName = "ctas_test";
117 
118  try {
119  barrier.await();
120 
121  MapdTestClient user =
122  MapdTestClient.getClient("localhost", 6274, db, dbUser, dbPassword);
123 
124  Random rand = new Random(tid);
125 
126  if (threadId == 2) {
127  sql = "CREATE TABLE " + ctasTableName + " AS (SELECT x, y, deci, str FROM "
128  + tableName + ");";
129  logger.info(logPrefix + " " + sql);
130  user.runSql(sql);
131  }
132 
133  sql = "SELECT * FROM " + tableName + " LIMIT 2;";
134  logger.info(logPrefix + " " + sql);
135  user.runSql(sql);
136 
137  boolean ctas_table_created = false;
138  while (!ctas_table_created) {
139  ctas_table_created = true;
140  try {
141  sql = "SELECT COUNT(*) FROM " + ctasTableName + ";";
142  logger.info(logPrefix + " VALIDATE " + sql);
143  user.sqlValidate(sql);
144  } catch (TOmniSciException e) {
145  if (e.getError_msg().indexOf("not found") != -1) {
146  Thread.sleep(1000);
147  ctas_table_created = false;
148  } else {
149  throw e;
150  }
151  }
152  }
153 
154  sql = "DELETE FROM " + ctasTableName + " WHERE y = " + rand.nextInt(num_rows)
155  + ";";
156  logger.info(logPrefix + " " + sql);
157  user.runSql(sql);
158 
159  if (threadId == 0) {
160  sql = "ALTER TABLE " + tableName + " ADD COLUMN n TEXT ENCODING DICT(8);";
161  logger.info(logPrefix + " VALIDATE " + sql);
162  user.runSql(sql);
163 
164  sql = "INSERT INTO " + tableName + " VALUES "
165  + "(" + tid + "," + tid + "," + tid + "," + tid + "," + tid + ","
166  + tid + "," + tid + "," + (tid % 2 == 0 ? "'mapd'" : "'omnisci'")
167  + ", 'z');";
168  logger.info(logPrefix + " " + sql);
169  user.runSql(sql);
170  }
171 
172  sql = "INSERT INTO " + ctasTableName + " (SELECT x, y, deci, str FROM "
173  + tableName + " WHERE str = '"
174  + text_values[rand.nextInt(text_values.length)] + "');";
175  logger.info(logPrefix + " " + sql);
176  user.runSql(sql);
177 
178  sql = "TRUNCATE TABLE " + tableName + ";";
179  logger.info(logPrefix + " " + sql);
180  user.runSql(sql);
181 
182  } catch (Exception e) {
183  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
184  exceptions[threadId] = e;
185  }
186  }
187  });
188  t.start();
189  threads.add(t);
190  }
191 
192  for (Thread t : threads) {
193  t.join();
194  }
195 
196  MapdTestClient dba =
197  MapdTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
198  dba.runSql("DROP TABLE " + tableName + ";");
199 
200  for (Exception e : exceptions) {
201  if (null != e) {
202  logger.error("Exception: " + e.getMessage(), e);
203  throw e;
204  }
205  }
206  }
207 
208  public void testConcurrency() throws Exception {
209  logger.info("CtasItasSelectUpdelConcurrencyTest()");
210 
212  "localhost", 6274, "omnisci", "admin", "HyperInteractive");
213  su.runSql("CREATE USER dba (password = 'password', is_super = 'true');");
214  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
215 
216  su.runSql("GRANT CREATE on DATABASE omnisci TO bob;");
217 
218  su.runSql("CREATE DATABASE db1;");
219  su.runSql("GRANT CREATE on DATABASE db1 TO bob;");
220  su.runSql("GRANT CREATE VIEW on DATABASE db1 TO bob;");
221  su.runSql("GRANT DROP on DATABASE db1 TO bob;");
222  su.runSql("GRANT DROP VIEW on DATABASE db1 TO bob;");
223 
224  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
225  // TODO: run some tests as bob
226 
227  su.runSql("DROP DATABASE db1;");
228  su.runSql("DROP USER bob;");
229  su.runSql("DROP USER dba;");
230 
231  logger.info("CtasItasSelectUpdelConcurrencyTest() done");
232  }
233 }
const int8_t const int64_t * num_rows
static MapdTestClient getClient(String host, int port, String db, String user, String password)
List< TColumnType > sqlValidate(String sql)
TQueryResult runSql(String sql)
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
static bool run