OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ImportAlterValidateSelectConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.concurrent.CyclicBarrier;
25 
26 import ai.heavy.thrift.server.TColumnType;
27 import ai.heavy.thrift.server.TCopyParams;
28 import ai.heavy.thrift.server.TCreateParams;
29 import ai.heavy.thrift.server.TDBException;
30 import ai.heavy.thrift.server.TImportHeaderRow;
31 import ai.heavy.thrift.server.TSourceType;
32 
34  final static String csvTableName = "import_test_mixed_varlen";
35  final static String geoTableName = "geospatial";
36 
37  String csv_file_path;
38  String geo_file_path;
39 
40  final static Logger logger =
41  LoggerFactory.getLogger(ImportAlterValidateSelectConcurrencyTest.class);
42 
44  String csv_file_path, String geo_file_path) {
45  this.geo_file_path = geo_file_path;
46  this.csv_file_path = csv_file_path;
47  }
48 
49  public static void main(String[] args) throws Exception {
50  // Command Line Args:
51  // 0: CSV file to import (absolute path accessible by server)
52  // 1: Geo file to import (absolute path accessible by server)
53  assert args.length == 2;
56  test.testConcurrency();
57  }
58 
59  private void runTest(
60  String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
61  throws Exception {
62  int num_threads = 4;
63  final int runs = 25;
64  final int fragment_size = 10;
65  Exception exceptions[] = new Exception[num_threads];
66 
67  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
68  public void run() {
69  try {
70  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
71  "localhost", 6274, db, dbaUser, dbaPassword);
72  dba.runSql("CREATE TABLE " + csvTableName
73  + "(pt GEOMETRY(POINT), ls GEOMETRY(LINESTRING), faii INTEGER[2], fadc DECIMAL(5, 2)[2], fatx TEXT[] ENCODING DICT(32), fatx2 TEXT[2] ENCODING DICT(32)) WITH(FRAGMENT_SIZE = "
74  + fragment_size + ")");
75 
76  dba.runSql("CREATE TABLE " + geoTableName
77  + "( trip INT, mpoly MULTIPOLYGON ) WITH(FRAGMENT_SIZE = "
78  + fragment_size + ")");
79 
80  } catch (Exception e) {
81  logger.error("[" + Thread.currentThread().getId() + "]"
82  + " Caught Exception: " + e.getMessage(),
83  e);
84  exceptions[0] = e;
85  }
86  }
87  });
88 
89  ArrayList<Thread> threads = new ArrayList<>();
90  for (int i = 0; i < num_threads; i++) {
91  logger.info("Starting " + i);
92  final int threadId = i;
93 
94  Thread t = new Thread(new Runnable() {
95  @Override
96  public void run() {
97  long tid = Thread.currentThread().getId();
98  String logPrefix = "[" + tid + "]";
99  String sql = "";
100 
101  TCopyParams copy_params = new TCopyParams();
102  copy_params.has_header = TImportHeaderRow.NO_HEADER;
103  copy_params.delimiter = ",";
104  copy_params.null_str = "\\N";
105  copy_params.quoted = true;
106  copy_params.quote = "\"";
107  copy_params.escape = "\"";
108  copy_params.line_delim = "\n";
109  copy_params.array_delim = ",";
110  copy_params.array_begin = "{";
111  copy_params.array_end = "}";
112  copy_params.threads = 0;
113 
114  TCopyParams geo_copy_params = new TCopyParams();
115  geo_copy_params.delimiter = ",";
116  geo_copy_params.null_str = "\\N";
117  geo_copy_params.quoted = true;
118  geo_copy_params.quote = "\"";
119  geo_copy_params.escape = "\"";
120  geo_copy_params.line_delim = "\n";
121  geo_copy_params.array_delim = ",";
122  geo_copy_params.array_begin = "{";
123  geo_copy_params.array_end = "}";
124  geo_copy_params.threads = 0;
125  geo_copy_params.source_type = TSourceType.GEO_FILE;
126 
127  try {
128  barrier.await();
129 
130  HeavyDBTestClient user = HeavyDBTestClient.getClient(
131  "localhost", 6274, db, dbUser, dbPassword);
132 
133  if (threadId % 2 == 0) {
134  logger.info(logPrefix + " IMPORT TABLE");
135  user.import_table(csvTableName, csv_file_path, copy_params);
136  if (threadId == 0) {
137  loadTable(user, logPrefix);
138  } else {
139  loadTableBinaryColumnar(user, logPrefix);
140  sql = "COPY " + csvTableName + " FROM '" + csv_file_path
141  + "' WITH (header = 'false');";
142  logAndRunSql(sql, user, logPrefix);
143  }
144  }
145 
146  sql = "DELETE FROM " + csvTableName + " WHERE fatx2 IS NULL;";
147  logAndRunSql(sql, user, logPrefix);
148 
149  sql = "SELECT COUNT(*) FROM " + csvTableName + ";";
150  logAndRunSql(sql, user, logPrefix);
151 
152  if (threadId == 1) {
153  Thread.sleep(5000); // Ensure import is launched
154  sql = "ALTER TABLE " + csvTableName + " DROP COLUMN faii;";
155  logAndRunSql(sql, user, logPrefix);
156  }
157 
158  if (threadId % 2 == 1) {
159  getTableDetails(user, logPrefix);
160  } else {
161  getTablesMetadata(user, logPrefix);
162  }
163 
164  sql = "SELECT * FROM " + geoTableName + ";";
165  logger.info(logPrefix + " VALIDATE " + sql);
166  final String validateSql = sql;
167  // Concurrent request to drop table may have occurred when this query is
168  // executed. Ignore the error response in this case.
169  ignoreMissingTable(() -> user.sqlValidate(validateSql), geoTableName);
170 
171  final String alterSql = "ALTER TABLE " + geoTableName + " SET max_rows = 10;";
172  // Concurrent request to drop table may have occurred when this query is
173  // executed. Ignore the error response in this case.
175  () -> logAndRunSql(alterSql, user, logPrefix), geoTableName);
176 
177  if (threadId == 3) {
178  logger.info(logPrefix + " IMPORT GEO TABLE");
179  // Concurrent request to drop table may have occurred when this query is
180  // executed. Ignore the error response in this case.
182  -> user.import_geo_table(geoTableName,
184  geo_copy_params,
185  new java.util.ArrayList<TColumnType>(),
186  new TCreateParams()),
187  geoTableName);
188  loadTableBinaryColumnarPolys(user, logPrefix);
189  }
190 
191  final String selectSql = "SELECT * FROM " + geoTableName + " LIMIT 2;";
192  // Concurrent request to drop table may have occurred when this query is
193  // executed. Ignore the error response in this case.
195  () -> logAndRunSql(selectSql, user, logPrefix), geoTableName);
196 
197  sql = "SELECT * FROM " + csvTableName + ";";
198  logger.info(logPrefix + " VALIDATE " + sql);
199  user.sqlValidate(sql);
200 
201  sql = "ALTER TABLE " + csvTableName + " SET max_rollback_epochs = 0;";
202  logAndRunSql(sql, user, logPrefix);
203 
204  sql = "COPY (SELECT * FROM " + csvTableName + ") TO 'test_export.csv';";
205  logAndRunSql(sql, user, logPrefix);
206 
207  for (int i = 0; i < 5; i++) {
208  final String insertSql = "INSERT INTO " + geoTableName + " VALUES (" + i
209  + ", 'MULTIPOLYGON(((0 0, 1 1, 2 2)))');";
210  // Concurrent request to drop table may have occurred when this query is
211  // executed. Ignore the error response in this case.
213  () -> logAndRunSql(insertSql, user, logPrefix), geoTableName);
214  }
215 
216  sql = "COPY (SELECT * FROM " + csvTableName + ") TO 'test_export.csv';";
217  logAndRunSql(sql, user, logPrefix);
218 
219  sql = "TRUNCATE TABLE " + csvTableName + ";";
220  logAndRunSql(sql, user, logPrefix);
221 
222  sql = "SELECT COUNT(*) FROM " + csvTableName + ";";
223  logger.info(logPrefix + " VALIDATE " + sql);
224  user.sqlValidate(sql);
225 
226  if (threadId == 0) {
227  Thread.sleep(5000); // Ensure import is launched
228  sql = "DROP TABLE " + geoTableName + ";";
229  logAndRunSql(sql, user, logPrefix);
230  }
231  } catch (Exception e) {
232  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
233  exceptions[threadId] = e;
234  }
235  }
236  });
237  t.start();
238  threads.add(t);
239  }
240 
241  for (Thread t : threads) {
242  t.join();
243  }
244 
245  HeavyDBTestClient dba =
246  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
247  dba.runSql("DROP TABLE " + csvTableName + ";");
248 
249  for (Exception e : exceptions) {
250  if (null != e) {
251  logger.error("Exception: " + e.getMessage(), e);
252  throw e;
253  }
254  }
255  }
256 
257  public void testConcurrency() throws Exception {
258  logger.info("ImportAlterValidateSelectConcurrencyTest()");
259  HeavyDBTestClient su = HeavyDBTestClient.getClient(
260  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
261  try {
262  su.runSql("CREATE USER dba (password = 'password', is_super = 'true');");
263  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
264 
265  su.runSql("GRANT CREATE on DATABASE heavyai TO bob;");
266 
267  su.runSql("CREATE DATABASE db1;");
268  su.runSql("GRANT CREATE on DATABASE db1 TO bob;");
269  su.runSql("GRANT CREATE VIEW on DATABASE db1 TO bob;");
270  su.runSql("GRANT DROP on DATABASE db1 TO bob;");
271  su.runSql("GRANT DROP VIEW on DATABASE db1 TO bob;");
272 
273  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
274  // TODO: run some tests as bob
275  } finally {
276  su.runSql("DROP DATABASE IF EXISTS db1;");
277  su.runSql("DROP USER IF EXISTS bob;");
278  su.runSql("DROP USER IF EXISTS dba;");
279  }
280 
281  logger.info("ImportAlterValidateSelectConcurrencyTest() done");
282  }
283 
284  @FunctionalInterface
285  private interface VoidFunction {
286  void call() throws Exception;
287  }
288 
289  private void ignoreMissingTable(final VoidFunction function, final String tableName)
290  throws Exception {
291  try {
292  function.call();
293  } catch (TDBException e) {
294  if (e.error_msg.matches("(Table/View\\s+" + tableName
295  + ".+does not exist|.+Object\\s+'" + tableName + "'\\s+not found)")) {
296  logger.info("Ignoring missing table error: " + e.error_msg);
297  } else {
298  throw e;
299  }
300  }
301  }
302 
303  private void logAndRunSql(String sql, HeavyDBTestClient user, String logPrefix)
304  throws Exception {
305  logger.info(logPrefix + " " + sql);
306  user.runSql(sql);
307  }
308 
309  private void loadTable(HeavyDBTestClient user, String logPrefix) throws Exception {
310  logger.info(logPrefix + " Calling load_table API");
311  List<List<String>> rows = new ArrayList<>();
312  for (int i = 0; i < 5; i++) {
313  rows.add(Arrays.asList("point(0 0)",
314  "linestring(0 0,1 1)",
315  "{1,1}",
316  "{1.11,1.11}",
317  "{\"1\",\"1\"}",
318  "{\"1\",\"1\"}"));
319  }
320  user.load_table(csvTableName, rows, new ArrayList<>());
321  }
322 
323  private void loadTableBinaryColumnar(HeavyDBTestClient user, String logPrefix)
324  throws Exception {
325  logger.info(logPrefix + " Calling load_table_binary_columnar API");
326  List<List<Object>> columns = new ArrayList<>();
327  for (int i = 0; i < 3; i++) {
328  columns.add(new ArrayList<>());
329  }
330  for (int i = 0; i < 5; i++) {
331  columns.get(0).add(Arrays.asList(Long.valueOf(1), Long.valueOf(1)));
332  columns.get(1).add(Arrays.asList("1", "1"));
333  columns.get(2).add(Arrays.asList("1", "1"));
334  }
335  user.load_table_binary_columnar(
336  csvTableName, columns, Arrays.asList("faii", "fatx", "fatx2"));
337  }
338 
339  private void loadTableBinaryColumnarPolys(HeavyDBTestClient user, String logPrefix)
340  throws Exception {
341  logger.info(logPrefix + " Calling load_table_binary_columnar_polys API");
342  List<List<Object>> columns = new ArrayList<>();
343  for (int i = 0; i < 2; i++) {
344  columns.add(new ArrayList<>());
345  }
346  for (int i = 0; i < 5; i++) {
347  columns.get(0).add(Long.valueOf(i));
348  columns.get(1).add("MULTIPOLYGON(((0 0,0 9,9 9,9 0),(2 2,1 1,3 3)))");
349  }
350  user.load_table_binary_columnar_polys(geoTableName, columns, new ArrayList<>());
351  }
352 
353  private void getTableDetails(HeavyDBTestClient user, String logPrefix)
354  throws Exception {
355  logger.info(logPrefix + " Calling get_table_details API");
356  user.get_table_details(csvTableName);
357  logger.info(logPrefix + " Calling get_table_details_for_database API");
358  // Concurrent request to drop table may have occurred when this query is
359  // executed. Ignore the error response in this case.
361  ()
362  -> user.get_table_details_for_database(geoTableName, "heavyai"),
363  geoTableName);
364  }
365 
366  private void getTablesMetadata(HeavyDBTestClient user, String logPrefix)
367  throws Exception {
368  logger.info(logPrefix + " Calling get_tables_meta API");
369  user.get_tables_meta();
370  }
371 }
void logAndRunSql(String sql, HeavyDBTestClient user, String logPrefix)
ImportAlterValidateSelectConcurrencyTest(String csv_file_path, String geo_file_path)
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
static bool run
void ignoreMissingTable(final VoidFunction function, final String tableName)