OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CtasItasSelectUpdelConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Random;
23 import java.util.concurrent.CyclicBarrier;
24 
25 import ai.heavy.thrift.server.TDBException;
26 
28  final static Logger logger =
29  LoggerFactory.getLogger(CtasItasSelectUpdelConcurrencyTest.class);
30 
31  final static String[] text_values = {"foo",
32  "bar",
33  "hello",
34  "world",
35  "a",
36  "b",
37  "c",
38  "d",
39  "e",
40  "f",
41  "g",
42  "h",
43  "i",
44  "j",
45  "k",
46  "l",
47  "m",
48  "n",
49  "o",
50  "p"};
51 
52  public static void main(String[] args) throws Exception {
54  test.testConcurrency();
55  }
56 
57  private void runTest(
58  String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
59  throws Exception {
60  int num_threads = 5;
61  final int runs = 25;
62  final int num_rows = 1000;
63  final int fragment_size = 10;
64  final String tableName = "test";
65  Exception exceptions[] = new Exception[num_threads];
66 
67  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
68  public void run() {
69  try {
70  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
71  "localhost", 6274, db, dbaUser, dbaPassword);
72  dba.runSql("CREATE TABLE " + tableName
73  + "(x BIGINT, y INTEGER, z SMALLINT, a TINYINT, f FLOAT, d DOUBLE, deci DECIMAL(18,6), str TEXT ENCODING NONE) WITH (FRAGMENT_SIZE = "
74  + fragment_size + ")");
75 
76  for (int i = 0; i < num_rows; i++) {
77  final String integer_val = Integer.toString(i);
78  final String small_val = Integer.toString(i % 128);
79  final String fp_val = Double.toString(i * 1.1);
80  final String deci_val = Double.toString(i + 0.01);
81  final String str_val = "'" + text_values[i % text_values.length] + "'";
82  final String values_string = String.join(" , ",
83  integer_val,
84  integer_val,
85  small_val,
86  small_val,
87  fp_val,
88  fp_val,
89  deci_val,
90  str_val);
91  dba.runSql("INSERT INTO " + tableName + " VALUES "
92  + "(" + values_string + ")");
93  }
94 
95  } catch (Exception e) {
96  logger.error("[" + Thread.currentThread().getId() + "]"
97  + " Caught Exception: " + e.getMessage(),
98  e);
99  exceptions[0] = e;
100  }
101  }
102  });
103 
104  ArrayList<Thread> threads = new ArrayList<>();
105  for (int i = 0; i < num_threads; i++) {
106  logger.info("Starting " + i);
107  final int threadId = i;
108 
109  Thread t = new Thread(new Runnable() {
110  @Override
111  public void run() {
112  long tid = Thread.currentThread().getId();
113  String logPrefix = "[" + tid + "]";
114  String sql = "";
115 
116  final String ctasTableName = "ctas_test";
117 
118  try {
119  barrier.await();
120 
121  HeavyDBTestClient user = HeavyDBTestClient.getClient(
122  "localhost", 6274, db, dbUser, dbPassword);
123 
124  Random rand = new Random(tid);
125 
126  if (threadId == 2) {
127  sql = "CREATE TABLE " + ctasTableName + " AS (SELECT x, y, deci, str FROM "
128  + tableName + ");";
129  logger.info(logPrefix + " " + sql);
130  user.runSql(sql);
131  }
132 
133  sql = "SELECT * FROM " + tableName + " LIMIT 2;";
134  logger.info(logPrefix + " " + sql);
135  user.runSql(sql);
136 
137  boolean ctas_table_created = false;
138  while (!ctas_table_created) {
139  ctas_table_created = true;
140  try {
141  sql = "SELECT COUNT(*) FROM " + ctasTableName + ";";
142  logger.info(logPrefix + " VALIDATE " + sql);
143  user.sqlValidate(sql);
144  } catch (TDBException e) {
145  if (e.getError_msg().indexOf("not found") != -1) {
146  Thread.sleep(1000);
147  ctas_table_created = false;
148  } else {
149  throw e;
150  }
151  }
152  }
153 
154  sql = "DELETE FROM " + ctasTableName + " WHERE y = " + rand.nextInt(num_rows)
155  + ";";
156  logger.info(logPrefix + " " + sql);
157  user.runSql(sql);
158 
159  if (threadId == 0) {
160  sql = "ALTER TABLE " + tableName + " ADD COLUMN n TEXT ENCODING DICT(8);";
161  logger.info(logPrefix + " VALIDATE " + sql);
162  user.runSql(sql);
163 
164  sql = "INSERT INTO " + tableName + " VALUES "
165  + "(" + tid + "," + tid + "," + tid + "," + tid + "," + tid + ","
166  + tid + "," + tid + "," + (tid % 2 == 0 ? "'value_1'" : "'value_2'")
167  + ", 'z');";
168  logger.info(logPrefix + " " + sql);
169  user.runSql(sql);
170  }
171 
172  sql = "INSERT INTO " + ctasTableName + " (SELECT x, y, deci, str FROM "
173  + tableName + " WHERE str = '"
174  + text_values[rand.nextInt(text_values.length)] + "');";
175  logger.info(logPrefix + " " + sql);
176  user.runSql(sql);
177 
178  sql = "UPDATE " + tableName + " SET str = (SELECT count(*) > 0 FROM "
179  + tableName + ");";
180  logger.info(logPrefix + " " + sql);
181  user.runSql(sql);
182 
183  sql = "TRUNCATE TABLE " + tableName + ";";
184  logger.info(logPrefix + " " + sql);
185  user.runSql(sql);
186 
187  } catch (Exception e) {
188  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
189  exceptions[threadId] = e;
190  }
191  }
192  });
193  t.start();
194  threads.add(t);
195  }
196 
197  for (Thread t : threads) {
198  t.join();
199  }
200 
201  HeavyDBTestClient dba =
202  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
203  dba.runSql("DROP TABLE " + tableName + ";");
204 
205  for (Exception e : exceptions) {
206  if (null != e) {
207  logger.error("Exception: " + e.getMessage(), e);
208  throw e;
209  }
210  }
211  }
212 
213  public void testConcurrency() throws Exception {
214  logger.info("CtasItasSelectUpdelConcurrencyTest()");
215 
216  HeavyDBTestClient su = HeavyDBTestClient.getClient(
217  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
218  su.runSql("CREATE USER dba (password = 'password', is_super = 'true');");
219  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
220 
221  su.runSql("GRANT CREATE on DATABASE heavyai TO bob;");
222 
223  su.runSql("CREATE DATABASE db1;");
224  su.runSql("GRANT CREATE on DATABASE db1 TO bob;");
225  su.runSql("GRANT CREATE VIEW on DATABASE db1 TO bob;");
226  su.runSql("GRANT DROP on DATABASE db1 TO bob;");
227  su.runSql("GRANT DROP VIEW on DATABASE db1 TO bob;");
228 
229  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
230  // TODO: run some tests as bob
231 
232  su.runSql("DROP DATABASE db1;");
233  su.runSql("DROP USER bob;");
234  su.runSql("DROP USER dba;");
235 
236  logger.info("CtasItasSelectUpdelConcurrencyTest() done");
237  }
238 }
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
static bool run