OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ImportAlterValidateSelectConcurrencyTest.java
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.tests;
17 
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.concurrent.CyclicBarrier;
25 import java.util.regex.Matcher;
26 import java.util.regex.Pattern;
27 
28 import ai.heavy.thrift.server.TColumnType;
29 import ai.heavy.thrift.server.TCopyParams;
30 import ai.heavy.thrift.server.TCreateParams;
31 import ai.heavy.thrift.server.TDBException;
32 import ai.heavy.thrift.server.TImportHeaderRow;
33 import ai.heavy.thrift.server.TSourceType;
34 
36  final static String csvTableName = "import_test_mixed_varlen";
37  final static String geoTableName = "geospatial";
38 
39  String csv_file_path;
40  String geo_file_path;
41 
42  final static Logger logger =
43  LoggerFactory.getLogger(ImportAlterValidateSelectConcurrencyTest.class);
44 
46  String csv_file_path, String geo_file_path) {
47  this.geo_file_path = geo_file_path;
48  this.csv_file_path = csv_file_path;
49  }
50 
51  public static void main(String[] args) throws Exception {
52  // Command Line Args:
53  // 0: CSV file to import (absolute path accessible by server)
54  // 1: Geo file to import (absolute path accessible by server)
55  assert args.length == 2;
58  test.testConcurrency();
59  }
60 
61  private void runTest(
62  String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
63  throws Exception {
64  int num_threads = 4;
65  final int runs = 25;
66  final int fragment_size = 10;
67  Exception exceptions[] = new Exception[num_threads];
68 
69  final CyclicBarrier barrier = new CyclicBarrier(num_threads, new Runnable() {
70  public void run() {
71  try {
72  HeavyDBTestClient dba = HeavyDBTestClient.getClient(
73  "localhost", 6274, db, dbaUser, dbaPassword);
74  dba.runSql("CREATE TABLE " + csvTableName
75  + "(pt GEOMETRY(POINT), ls GEOMETRY(LINESTRING), faii INTEGER[2], fadc DECIMAL(5, 2)[2], fatx TEXT[] ENCODING DICT(32), fatx2 TEXT[2] ENCODING DICT(32)) WITH(FRAGMENT_SIZE = "
76  + fragment_size + ")");
77 
78  dba.runSql("CREATE TABLE " + geoTableName
79  + "( trip INT, mpoly MULTIPOLYGON ) WITH(FRAGMENT_SIZE = "
80  + fragment_size + ")");
81 
82  } catch (Exception e) {
83  logger.error("[" + Thread.currentThread().getId() + "]"
84  + " Caught Exception: " + e.getMessage(),
85  e);
86  exceptions[0] = e;
87  }
88  }
89  });
90 
91  ArrayList<Thread> threads = new ArrayList<>();
92  for (int i = 0; i < num_threads; i++) {
93  logger.info("Starting " + i);
94  final int threadId = i;
95 
96  Thread t = new Thread(new Runnable() {
97  @Override
98  public void run() {
99  long tid = Thread.currentThread().getId();
100  String logPrefix = "[" + tid + "]";
101  String sql = "";
102 
103  TCopyParams copy_params = new TCopyParams();
104  copy_params.has_header = TImportHeaderRow.NO_HEADER;
105  copy_params.delimiter = ",";
106  copy_params.null_str = "\\N";
107  copy_params.quoted = true;
108  copy_params.quote = "\"";
109  copy_params.escape = "\"";
110  copy_params.line_delim = "\n";
111  copy_params.array_delim = ",";
112  copy_params.array_begin = "{";
113  copy_params.array_end = "}";
114  copy_params.threads = 0;
115 
116  TCopyParams geo_copy_params = new TCopyParams();
117  geo_copy_params.delimiter = ",";
118  geo_copy_params.null_str = "\\N";
119  geo_copy_params.quoted = true;
120  geo_copy_params.quote = "\"";
121  geo_copy_params.escape = "\"";
122  geo_copy_params.line_delim = "\n";
123  geo_copy_params.array_delim = ",";
124  geo_copy_params.array_begin = "{";
125  geo_copy_params.array_end = "}";
126  geo_copy_params.threads = 0;
127  geo_copy_params.source_type = TSourceType.GEO_FILE;
128 
129  try {
130  barrier.await();
131 
132  HeavyDBTestClient user = HeavyDBTestClient.getClient(
133  "localhost", 6274, db, dbUser, dbPassword);
134 
135  if (threadId % 2 == 0) {
136  logger.info(logPrefix + " IMPORT TABLE");
137  user.import_table(csvTableName, csv_file_path, copy_params);
138  if (threadId == 0) {
139  loadTable(user, logPrefix);
140  } else {
141  loadTableBinaryColumnar(user, logPrefix);
142  sql = "COPY " + csvTableName + " FROM '" + csv_file_path
143  + "' WITH (header = 'false');";
144  logAndRunSql(sql, user, logPrefix);
145  }
146  }
147 
148  sql = "DELETE FROM " + csvTableName + " WHERE fatx2 IS NULL;";
149  logAndRunSql(sql, user, logPrefix);
150 
151  sql = "SELECT COUNT(*) FROM " + csvTableName + ";";
152  logAndRunSql(sql, user, logPrefix);
153 
154  if (threadId == 1) {
155  Thread.sleep(5000); // Ensure import is launched
156  sql = "ALTER TABLE " + csvTableName + " DROP COLUMN faii;";
157  logAndRunSql(sql, user, logPrefix);
158  }
159 
160  if (threadId % 2 == 1) {
161  getTableDetails(user, logPrefix);
162  } else {
163  getTablesMetadata(user, logPrefix);
164  }
165 
166  sql = "SELECT * FROM " + geoTableName + ";";
167  logger.info(logPrefix + " VALIDATE " + sql);
168  final String validateSql = sql;
169  // Concurrent request to drop table may have occurred when this query is
170  // executed. Ignore the error response in this case.
171  ignoreMissingTable(() -> user.sqlValidate(validateSql), geoTableName);
172 
173  final String alterSql = "ALTER TABLE " + geoTableName + " SET max_rows = 10;";
174  // Concurrent request to drop table may have occurred when this query is
175  // executed. Ignore the error response in this case.
177  () -> logAndRunSql(alterSql, user, logPrefix), geoTableName);
178 
179  if (threadId == 3) {
180  logger.info(logPrefix + " IMPORT GEO TABLE");
181  // Concurrent request to drop table may have occurred when this query is
182  // executed. Ignore the error response in this case.
184  -> user.import_geo_table(geoTableName,
186  geo_copy_params,
187  new java.util.ArrayList<TColumnType>(),
188  new TCreateParams()),
189  geoTableName);
190  loadTableBinaryColumnar(user, logPrefix);
191  }
192 
193  final String selectSql = "SELECT * FROM " + geoTableName + " LIMIT 2;";
194  // Concurrent request to drop table may have occurred when this query is
195  // executed. Ignore the error response in this case.
197  () -> logAndRunSql(selectSql, user, logPrefix), geoTableName);
198 
199  sql = "SELECT * FROM " + csvTableName + ";";
200  logger.info(logPrefix + " VALIDATE " + sql);
201  user.sqlValidate(sql);
202 
203  sql = "ALTER TABLE " + csvTableName + " SET max_rollback_epochs = 0;";
204  logAndRunSql(sql, user, logPrefix);
205 
206  sql = "COPY (SELECT * FROM " + csvTableName + ") TO 'test_export.csv';";
207  logAndRunSql(sql, user, logPrefix);
208 
209  for (int i = 0; i < 5; i++) {
210  final String insertSql = "INSERT INTO " + geoTableName + " VALUES (" + i
211  + ", 'MULTIPOLYGON(((0 0, 1 1, 2 2)))');";
212  // Concurrent request to drop table may have occurred when this query is
213  // executed. Ignore the error response in this case.
215  () -> logAndRunSql(insertSql, user, logPrefix), geoTableName);
216  }
217 
218  sql = "COPY (SELECT * FROM " + csvTableName + ") TO 'test_export.csv';";
219  logAndRunSql(sql, user, logPrefix);
220 
221  sql = "TRUNCATE TABLE " + csvTableName + ";";
222  logAndRunSql(sql, user, logPrefix);
223 
224  sql = "SELECT COUNT(*) FROM " + csvTableName + ";";
225  logger.info(logPrefix + " VALIDATE " + sql);
226  user.sqlValidate(sql);
227 
228  if (threadId == 0) {
229  Thread.sleep(5000); // Ensure import is launched
230  sql = "DROP TABLE " + geoTableName + ";";
231  logAndRunSql(sql, user, logPrefix);
232  }
233  } catch (Exception e) {
234  logger.error(logPrefix + " Caught Exception: " + e.getMessage(), e);
235  exceptions[threadId] = e;
236  }
237  }
238  });
239  t.start();
240  threads.add(t);
241  }
242 
243  for (Thread t : threads) {
244  t.join();
245  }
246 
247  HeavyDBTestClient dba =
248  HeavyDBTestClient.getClient("localhost", 6274, db, dbaUser, dbaPassword);
249  dba.runSql("DROP TABLE " + csvTableName + ";");
250 
251  for (Exception e : exceptions) {
252  if (null != e) {
253  logger.error("Exception: " + e.getMessage(), e);
254  throw e;
255  }
256  }
257  }
258 
259  public void testConcurrency() throws Exception {
260  logger.info("ImportAlterValidateSelectConcurrencyTest()");
261  HeavyDBTestClient su = HeavyDBTestClient.getClient(
262  "localhost", 6274, "heavyai", "admin", "HyperInteractive");
263  try {
264  su.runSql("CREATE USER dba (password = 'password', is_super = 'true');");
265  su.runSql("CREATE USER bob (password = 'password', is_super = 'false');");
266 
267  su.runSql("GRANT CREATE on DATABASE heavyai TO bob;");
268 
269  su.runSql("CREATE DATABASE db1;");
270  su.runSql("GRANT CREATE on DATABASE db1 TO bob;");
271  su.runSql("GRANT CREATE VIEW on DATABASE db1 TO bob;");
272  su.runSql("GRANT DROP on DATABASE db1 TO bob;");
273  su.runSql("GRANT DROP VIEW on DATABASE db1 TO bob;");
274 
275  runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive");
276  // TODO: run some tests as bob
277  } finally {
278  su.runSql("DROP DATABASE IF EXISTS db1;");
279  su.runSql("DROP USER IF EXISTS bob;");
280  su.runSql("DROP USER IF EXISTS dba;");
281  }
282 
283  logger.info("ImportAlterValidateSelectConcurrencyTest() done");
284  }
285 
286  @FunctionalInterface
287  private interface VoidFunction {
288  void call() throws Exception;
289  }
290 
291  private void ignoreMissingTable(final VoidFunction function, final String tableName)
292  throws Exception {
293  try {
294  function.call();
295  } catch (TDBException e) {
296  Pattern pattern = Pattern.compile("(Table/View\\s+" + tableName
297  + ".+does not exist|.+Object\\s+'" + tableName + "'\\s+not found)");
298  Matcher matcher = pattern.matcher(e.error_msg);
299  if (matcher.find()) {
300  logger.info("Ignoring missing table error: " + e.error_msg);
301  } else {
302  throw e;
303  }
304  }
305  }
306 
307  private void logAndRunSql(String sql, HeavyDBTestClient user, String logPrefix)
308  throws Exception {
309  logger.info(logPrefix + " " + sql);
310  user.runSql(sql);
311  }
312 
313  private void loadTable(HeavyDBTestClient user, String logPrefix) throws Exception {
314  logger.info(logPrefix + " Calling load_table API");
315  List<List<String>> rows = new ArrayList<>();
316  for (int i = 0; i < 5; i++) {
317  rows.add(Arrays.asList("point(0 0)",
318  "linestring(0 0,1 1)",
319  "{1,1}",
320  "{1.11,1.11}",
321  "{\"1\",\"1\"}",
322  "{\"1\",\"1\"}"));
323  }
324  user.load_table(csvTableName, rows, new ArrayList<>());
325  }
326 
327  private void loadTableBinaryColumnar(HeavyDBTestClient user, String logPrefix)
328  throws Exception {
329  logger.info(logPrefix + " Calling load_table_binary_columnar API");
330  List<List<Object>> columns = new ArrayList<>();
331  for (int i = 0; i < 3; i++) {
332  columns.add(new ArrayList<>());
333  }
334  for (int i = 0; i < 5; i++) {
335  columns.get(0).add(Arrays.asList(Long.valueOf(1), Long.valueOf(1)));
336  columns.get(1).add(Arrays.asList("1", "1"));
337  columns.get(2).add(Arrays.asList("1", "1"));
338  }
339  user.load_table_binary_columnar(
340  csvTableName, columns, Arrays.asList("faii", "fatx", "fatx2"));
341  }
342 
343  private void getTableDetails(HeavyDBTestClient user, String logPrefix)
344  throws Exception {
345  logger.info(logPrefix + " Calling get_table_details API");
346  user.get_table_details(csvTableName);
347  logger.info(logPrefix + " Calling get_table_details_for_database API");
348  // Concurrent request to drop table may have occurred when this query is
349  // executed. Ignore the error response in this case.
351  ()
352  -> user.get_table_details_for_database(geoTableName, "heavyai"),
353  geoTableName);
354  }
355 
356  private void getTablesMetadata(HeavyDBTestClient user, String logPrefix)
357  throws Exception {
358  logger.info(logPrefix + " Calling get_tables_meta API");
359  user.get_tables_meta();
360  }
361 }
void logAndRunSql(String sql, HeavyDBTestClient user, String logPrefix)
ImportAlterValidateSelectConcurrencyTest(String csv_file_path, String geo_file_path)
void runTest(String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword)
tuple rows
Definition: report.py:114
static bool run
void ignoreMissingTable(final VoidFunction function, final String tableName)