OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UpdateDeleteInsertConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.apache.commons.cli.*;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 
22 import java.util.ArrayList;
23 import java.util.Random;
24 import java.util.concurrent.CyclicBarrier;
25 
27  final static Logger logger =
28  LoggerFactory.getLogger(UpdateDeleteInsertConcurrencyTest.class);
29 
30  final static String[] text_values = {"foo",
31  "bar",
32  "hello",
33  "world",
34  "a",
35  "b",
36  "c",
37  "d",
38  "e",
39  "f",
40  "g",
41  "h",
42  "i",
43  "j",
44  "k",
45  "l",
46  "m",
47  "n",
48  "o",
49  "p"};
50 
51  public static void main(String[] args) throws Exception {
52  Options options = new Options();
53  options.addOption(Option.builder("t")
54  .longOpt("temptables")
55  .desc("Use temporary tables for test")
56  .build());
57  CommandLineParser clp = new DefaultParser();
58  final CommandLine commandLine = clp.parse(options, args);
59 
60  Boolean useTemporaryTables = commandLine.hasOption("temptables");
62  test.testUpdateDeleteInsertConcurrency(useTemporaryTables);
63  }
64 
65  private Boolean useTemporaryTables_ = false;
66 
67  private void runTest(String db,
68  String dbaUser,
69  String dbaPassword,
70  String dbUser,
71  String dbPassword,
72  Boolean concurrentInserts) throws Exception {
73  int num_threads = 5;
74  final int runs = 25;
75  final int num_rows = 1000;
76  final int fragment_size = 10;
77  final String tableName = "test";
78  Exception exceptions[] = new Exception[num_threads];
79 
80  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
81  public void run() {
82  try {
83  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
84  "localhost", 6274, db, dbaUser, dbaPassword);
85  final String createPrefix =
86  useTemporaryTables_ ? "CREATE TEMPORARY TABLE " : "CREATE TABLE ";
87  dba.runSql(createPrefix + tableName
88  + "(x BIGINT, y INTEGER, z SMALLINT, a TINYINT, f FLOAT, d DOUBLE, deci DECIMAL(18,6), str TEXT ENCODING NONE) WITH (FRAGMENT_SIZE = "
89  + fragment_size + ")");
90  if (!concurrentInserts) {
91  for (int i = 0; i < num_rows; i++) {
92  final String integer_val = Integer.toString(i);
93  final String small_val = Integer.toString(i % 128);
94  final String fp_val = Double.toString(i * 1.1);
95  final String deci_val = Double.toString(i + 0.01);
96  final String str_val = "'" + text_values[i % text_values.length] + "'";
97  final String values_string = String.join(" , ",
98  integer_val,
99  integer_val,
100  small_val,
101  small_val,
102  fp_val,
103  fp_val,
104  deci_val,
105  str_val);
106  dba.runSql("INSERT INTO " + tableName + " VALUES "
107  + "(" + values_string + ")");
108  }
109  }
110  } catch (Exception e) {
111  logger.error("[" + Thread.currentThread().getId() + "]"
112  + " Caught Exception: " + e.getMessage(),
113  e);
114  exceptions[0] = e;
115  }
116  }
117  });
118 
119  ArrayList<Thread> threads = new ArrayList<>();
120  for (int i = 0; i < num_threads; i++) {
121  logger.info("Starting " + i);
122  final int threadId = i;
123 
124  Thread t = new Thread(new Runnable() {
125  @Override
126  public void run() {
127  long tid = Thread.currentThread().getId();
128  String logPrefix = "[" + tid + "]";
129  String sql = "";
130 
131  try {
132  barrier.await();
133 
134  HeavyDBTestClient user = HeavyDBTestClient.getClient(
135  "localhost", 6274, db, dbUser, dbPassword);
136 
137  if (concurrentInserts) {
138  for (int i = 0; i < num_rows / num_threads; i++) {
139  final String integer_val = Integer.toString(i);
140  final String small_val = Integer.toString(i % 128);
141  final String fp_val = Double.toString(i * 1.1);
142  final String deci_val = Double.toString(i + 0.01);
143  final String str_val = "'" + text_values[i % text_values.length] + "'";
144  final String values_string = String.join(" , ",
145  integer_val,
146  integer_val,
147  small_val,
148  small_val,
149  fp_val,
150  fp_val,
151  deci_val,
152  str_val);
153  user.runSql("INSERT INTO " + tableName + " VALUES "
154  + "(" + values_string + ")");
155  }
156  }
157 
158  Random rand = new Random(tid);
159 
160  sql = "DELETE FROM " + tableName + " WHERE x = " + (tid * 2) + ";";
161  logger.info(logPrefix + " " + sql);
162  user.runSql(sql);
163 
164  sql = "DELETE FROM " + tableName + " WHERE y = " + rand.nextInt(num_rows)
165  + ";";
166  logger.info(logPrefix + " " + sql);
167  user.runSql(sql);
168 
169  sql = "SELECT COUNT(*) FROM " + tableName + " WHERE x > " + (tid * 2) + ";";
170  logger.info(logPrefix + " " + sql);
171  user.runSql(sql);
172 
173  sql = "DELETE FROM " + tableName + " WHERE str = '"
174  + text_values[rand.nextInt(text_values.length)] + "';";
175  logger.info(logPrefix + " " + sql);
176  user.runSql(sql);
177 
178  sql = "SELECT * FROM " + tableName + " WHERE str = '"
179  + text_values[rand.nextInt(text_values.length)] + "';";
180  logger.info(logPrefix + " " + sql);
181  user.runSql(sql);
182 
183  sql = "DELETE FROM " + tableName + " WHERE d < " + rand.nextInt(num_rows / 4)
184  + ";";
185  logger.info(logPrefix + " " + sql);
186  user.runSql(sql);
187 
188  sql = "INSERT INTO " + tableName + " VALUES "
189  + "(" + tid + "," + tid + "," + tid + "," + tid + "," + tid + ","
190  + tid + "," + tid + "," + (tid % 2 == 0 ? "'value_1'" : "'value_2'")
191  + ");";
192  logger.info(logPrefix + " " + sql);
193  user.runSql(sql);
194 
195  sql = "DELETE FROM " + tableName + " WHERE z = " + tid + ";";
196  logger.info(logPrefix + " " + sql);
197  user.runSql(sql);
198 
199  } catch (Exception e) {
200  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
201  exceptions[threadId] = e;
202  }
203  }
204  });
205  t.start();
206  threads.add(t);
207  }
208 
209  for (Thread t : threads) {
210  t.join();
211  }
212 
213  HeavyDBTestClient dba =
214  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
215  dba.runSql("DROP TABLE " + tableName + ";");
216 
217  for (Exception e : exceptions) {
218  if (null != e) {
219  logger.error("Exception: " + e.getMessage(), e);
220  throw e;
221  }
222  }
223  }
224 
225  public void testUpdateDeleteInsertConcurrency(Boolean useTemporaryTables)
226  throws Exception {
227  logger.info("testUpdateDeleteInsertConcurrency()");
228 
229  useTemporaryTables_ = useTemporaryTables;
230  if (useTemporaryTables_) {
231  logger.info("Using temporary tables");
232  }
233  HeavyDBTestClient su = HeavyDBTestClient.getClient(
234  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
235  su.runSql("CREATE USER dba (password = 'password', is_super = 'true');");
236  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
237 
238  su.runSql("GRANT CREATE on DATABASE heavyai TO bob;");
239 
240  su.runSql("CREATE DATABASE db1;");
241  su.runSql("GRANT CREATE on DATABASE db1 TO bob;");
242  su.runSql("GRANT CREATE VIEW on DATABASE db1 TO bob;");
243  su.runSql("GRANT DROP on DATABASE db1 TO bob;");
244  su.runSql("GRANT DROP VIEW on DATABASE db1 TO bob;");
245 
246  runTest("db1",
247  "admin",
248  "HyperInteractive",
249  "admin",
250  "HyperInteractive",
251  /* concurrentInserts= */ false);
252  runTest("db1",
253  "admin",
254  "HyperInteractive",
255  "admin",
256  "HyperInteractive",
257  /* concurrentInserts= */ true);
258  // TODO: run some tests as bob
259 
260  su.runSql("DROP DATABASE db1;");
261  su.runSql("DROP USER bob;");
262  su.runSql("DROP USER dba;");
263 
264  logger.info("testUpdateDeleteInsertConcurrency() done");
265  }
266 }
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword, Boolean concurrentInserts)
static bool run