OmniSciDB  c07336695a
run-benchmark-import.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import pymapd
8 import pandas
9 import re
10 from argparse import ArgumentParser
11 
12 # For usage info, run: `./<script_name>.py --help`
13 
14 
15 def get_connection(**kwargs):
16  """
17  Connects to the db using pymapd
18  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
19 
20  Kwargs:
21  db_user(str): DB username
22  db_passwd(str): DB password
23  db_server(str): DB host
24  db_port(int): DB port
25  db_name(str): DB name
26 
27  Returns:
28  con(class): Connection class
29  False(bool): The connection failed. Exception should be logged.
30  """
31  try:
32  logging.debug("Connecting to mapd db...")
33  con = pymapd.connect(
34  user=kwargs["db_user"],
35  password=kwargs["db_passwd"],
36  host=kwargs["db_server"],
37  port=kwargs["db_port"],
38  dbname=kwargs["db_name"],
39  )
40  logging.info("Succesfully connected to mapd db")
41  return con
42  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
43  logging.exception("Error connecting to database.")
44  return False
45 
46 
48  # Function to allow json to deal with datetime and numpy int
49  if isinstance(x, datetime.datetime):
50  return x.isoformat()
51  raise TypeError("Unknown type")
52 
53 
54 # Parse input parameters
55 parser = ArgumentParser()
56 optional = parser._action_groups.pop()
57 required = parser.add_argument_group("required arguments")
58 parser._action_groups.append(optional)
59 optional.add_argument(
60  "-v", "--verbose", action="store_true", help="Turn on debug logging"
61 )
62 optional.add_argument(
63  "-q",
64  "--quiet",
65  action="store_true",
66  help="Suppress script outuput " + "(except warnings and errors)",
67 )
68 required.add_argument(
69  "-u", "--user", dest="user", default="mapd", help="Source database user"
70 )
71 required.add_argument(
72  "-p",
73  "--passwd",
74  dest="passwd",
75  default="HyperInteractive",
76  help="Source database password",
77 )
78 required.add_argument(
79  "-s",
80  "--server",
81  dest="server",
82  default="localhost",
83  help="Source database server hostname",
84 )
85 optional.add_argument(
86  "-o",
87  "--port",
88  dest="port",
89  type=int,
90  default=6274,
91  help="Source database server port",
92 )
93 required.add_argument(
94  "-n", "--name", dest="name", default="mapd", help="Source database name"
95 )
96 required.add_argument(
97  "-l", "--label", dest="label", required=True, help="Benchmark run label"
98 )
99 required.add_argument(
100  "-f",
101  "--import-file",
102  dest="import_file",
103  required=True,
104  help="Absolute path to file on omnisci_server machine with data for "
105  + "import test",
106 )
107 required.add_argument(
108  "-c",
109  "--table-schema-file",
110  dest="table_schema_file",
111  required=True,
112  help="Path to local file with CREATE TABLE sql statement for the "
113  + "import table",
114 )
115 optional.add_argument(
116  "-t",
117  "--import-table-name",
118  dest="import_table_name",
119  default="import_benchmark_test",
120  help="Name of table to import data to. NOTE: This table will be dropped "
121  + "before and after the import test, unless "
122  + "--no-drop-table-[before/after] is specified.",
123 )
124 optional.add_argument(
125  "-F",
126  "--import-query-template-file",
127  dest="import_query_template_file",
128  help="Path to file containing template for import query. "
129  + 'The script will replace "##TAB##" with the value of import_table_name '
130  + 'and "##FILE##" with the value of table_schema_file. By default, the '
131  + "script will use the COPY FROM command with the default default "
132  + "delimiter (,).",
133 )
134 optional.add_argument(
135  "--no-drop-table-before",
136  dest="no_drop_table_before",
137  action="store_true",
138  help="Do not drop the import table and recreate it before import "
139  + "NOTE: Make sure existing table schema matches import .csv file schema",
140 )
141 optional.add_argument(
142  "--no-drop-table-after",
143  dest="no_drop_table_after",
144  action="store_true",
145  help="Do not drop the import table after import",
146 )
147 optional.add_argument(
148  "-A",
149  "--import-test-name",
150  dest="import_test_name",
151  help='Name of import test (ex: "ips"). Required when using '
152  + "jenkins_bench_json as output.",
153 )
154 optional.add_argument(
155  "-m", "--machine-name", dest="machine_name", help="Name of source machine"
156 )
157 optional.add_argument(
158  "-a",
159  "--machine-uname",
160  dest="machine_uname",
161  help="Uname info from " + "source machine",
162 )
163 optional.add_argument(
164  "-e",
165  "--destination",
166  dest="destination",
167  default="mapd_db",
168  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
169  + "Multiple values can be input seperated by commas, "
170  + 'ex: "mapd_db,file_json"',
171 )
172 optional.add_argument(
173  "-U",
174  "--dest-user",
175  dest="dest_user",
176  default="mapd",
177  help="Destination mapd_db database user",
178 )
179 optional.add_argument(
180  "-P",
181  "--dest-passwd",
182  dest="dest_passwd",
183  default="HyperInteractive",
184  help="Destination mapd_db database password",
185 )
186 optional.add_argument(
187  "-S",
188  "--dest-server",
189  dest="dest_server",
190  help="Destination mapd_db database server hostname"
191  + ' (required if destination = "mapd_db")',
192 )
193 optional.add_argument(
194  "-O",
195  "--dest-port",
196  dest="dest_port",
197  type=int,
198  default=6274,
199  help="Destination mapd_db database server port",
200 )
201 optional.add_argument(
202  "-N",
203  "--dest-name",
204  dest="dest_name",
205  default="mapd",
206  help="Destination mapd_db database name",
207 )
208 optional.add_argument(
209  "-T",
210  "--dest-table",
211  dest="dest_table",
212  default="import_results",
213  help="Destination mapd_db table name",
214 )
215 optional.add_argument(
216  "-C",
217  "--dest-table-schema-file",
218  dest="dest_table_schema_file",
219  default="results_table_schemas/import-results.sql",
220  help="Destination table schema file. This must be an executable CREATE "
221  + "TABLE statement that matches the output of this script. It is "
222  + "required when creating the results table. Default location is in "
223  + '"./results_table_schemas/query-results.sql"',
224 )
225 optional.add_argument(
226  "-j",
227  "--output-file-json",
228  dest="output_file_json",
229  help="Absolute path of .json output file "
230  + '(required if destination = "file_json")',
231 )
232 optional.add_argument(
233  "-J",
234  "--output-file-jenkins",
235  dest="output_file_jenkins",
236  help="Absolute path of jenkins benchmark .json output file "
237  + '(required if destination = "jenkins_bench")',
238 )
239 args = parser.parse_args()
240 if args.verbose:
241  logging.basicConfig(level=logging.DEBUG)
242 elif args.quiet:
243  logging.basicConfig(level=logging.WARNING)
244 else:
245  logging.basicConfig(level=logging.INFO)
246 source_db_user = args.user
247 source_db_passwd = args.passwd
248 source_db_server = args.server
249 source_db_port = args.port
250 source_db_name = args.name
251 label = args.label
252 import_file = args.import_file
253 table_schema_file = args.table_schema_file
254 import_table_name = args.import_table_name
255 import_query_template_file = args.import_query_template_file
256 no_drop_table_before = args.no_drop_table_before
257 no_drop_table_after = args.no_drop_table_after
258 import_test_name = args.import_test_name
259 machine_name = args.machine_name
260 machine_uname = args.machine_uname
261 destinations = args.destination.split(",")
262 if "mapd_db" in destinations:
263  valid_destination_set = True
264  dest_db_user = args.dest_user
265  dest_db_passwd = args.dest_passwd
266  if args.dest_server is None:
267  # If dest_server is not set for mapd_db, then exit
268  logging.error('"dest_server" is required when destination = "mapd_db"')
269  exit(1)
270  else:
271  dest_db_server = args.dest_server
272  dest_db_port = args.dest_port
273  dest_db_name = args.dest_name
274  dest_table = args.dest_table
275  dest_table_schema_file = args.dest_table_schema_file
276 if "file_json" in destinations:
277  valid_destination_set = True
278  if args.output_file_json is None:
279  # If output_file_json is not set for file_json, then exit
280  logging.error(
281  '"output_file_json" is required when destination = "file_json"'
282  )
283  exit(1)
284  else:
285  output_file_json = args.output_file_json
286 if "output" in destinations:
287  valid_destination_set = True
288 if "jenkins_bench" in destinations:
289  valid_destination_set = True
290  if args.output_file_jenkins is None:
291  # If output_file_jenkins is not set for jenkins_bench, then exit
292  logging.error(
293  '"output_file_jenkins" is required '
294  + 'when destination = "jenkins_bench"'
295  )
296  exit(1)
297  elif args.import_test_name is None:
298  # If import_test_name is not set for jenkins_bench, then exit
299  logging.error(
300  '"import_test_name" is required '
301  + 'when destination = "jenkins_bench"'
302  )
303  exit(1)
304  else:
305  output_file_jenkins = args.output_file_jenkins
306 if not valid_destination_set:
307  logging.error("No valid destination(s) have been set. Exiting.")
308  exit(1)
309 
310 
311 # Establish connection to mapd db
313  db_user=source_db_user,
314  db_passwd=source_db_passwd,
315  db_server=source_db_server,
316  db_port=source_db_port,
317  db_name=source_db_name,
318 )
319 if not con:
320  exit(1) # Exit if cannot connect to db
321 
322 # Set run vars
323 run_guid = str(uuid.uuid4())
324 logging.debug("Run guid: " + run_guid)
325 run_timestamp = datetime.datetime.now()
326 run_connection = str(con)
327 logging.debug("Connection string: " + run_connection)
328 run_driver = "" # TODO
329 run_version = con._client.get_version()
330 if "-" in run_version:
331  run_version_short = run_version.split("-")[0]
332 else:
333  run_version_short = run_version
334 conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
335 # Set machine names, using local info if connected to localhost
336 if conn_machine_name == "localhost":
337  local_uname = os.uname()
338 if machine_name:
339  run_machine_name = machine_name
340 else:
341  if conn_machine_name == "localhost":
342  run_machine_name = local_uname.nodename
343  else:
344  run_machine_name = conn_machine_name
345 if machine_uname:
346  run_machine_uname = machine_uname
347 else:
348  if conn_machine_name == "localhost":
349  run_machine_uname = " ".join(local_uname)
350  else:
351  run_machine_uname = ""
352 
353 # See if import table exists, delete and/or create
354 if not no_drop_table_before:
355  logging.info("Dropping import table if exists")
356  con.execute("drop table if exists " + import_table_name)
357  logging.debug("Creating import table.")
358  try:
359  with open(table_schema_file, "r") as table_schema:
360  logging.debug("Reading table_schema_file: " + table_schema_file)
361  create_table_sql = table_schema.read().replace("\n", " ")
362  create_table_sql = create_table_sql.replace(
363  "##TAB##", import_table_name
364  )
365  except FileNotFoundError:
366  logging.exception("Could not find table_schema_file.")
367  exit(1)
368  try:
369  logging.debug("Creating import table...")
370  res = con.execute(create_table_sql)
371  logging.debug("Import table created.")
372  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
373  logging.exception("Error running table creation")
374  exit(1)
375 
376 
377 # Run the import query
378 if import_query_template_file:
379  try:
380  with open(import_query_template_file, "r") as import_query_template:
381  logging.debug(
382  "Reading import_query_template_file: " + import_query_template_file
383  )
384  import_query = import_query_template.read().replace("\n", " ")
385  import_query = import_query.replace(
386  "##TAB##", import_table_name
387  )
388  import_query = import_query.replace(
389  "##FILE##", import_file
390  )
391  except FileNotFoundError:
392  logging.exception("Could not find import_query_template_file.")
393  exit(1)
394 else:
395  import_query = "COPY %s FROM '%s';" % (import_table_name, import_file)
396 logging.debug("Import query: " + import_query)
397 logging.info("Starting import...")
398 start_time = timeit.default_timer()
399 try:
400  res = con.execute(import_query)
401  end_time = timeit.default_timer()
402  logging.info("Completed import.")
403 except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
404  logging.exception("Error running import query")
405  if no_drop_table_before:
406  logging.error(
407  'Import failed and "--no-drop-table-before" was '
408  + "passed in. Make sure existing table schema matches "
409  + "import .csv file schema."
410  )
411  exit(1)
412 
413 # Calculate times and results values
414 query_elapsed_time = round(((end_time - start_time) * 1000), 1)
415 execution_time = res._result.execution_time_ms
416 connect_time = round((query_elapsed_time - execution_time), 2)
417 res_output = str(res.fetchall()[0])
418 logging.debug("Query result output: " + res_output)
419 rows_loaded = re.search(r"Loaded: (.*?) recs, R", res_output).group(1)
420 rows_rejected = re.search(r"Rejected: (.*?) recs i", res_output).group(1)
421 
422 # Done with import. Dropping table and closing connection
423 if not no_drop_table_after:
424  logging.debug("Dropping import table")
425  con.execute("drop table " + import_table_name)
426 logging.debug("Closing source db connection.")
427 con.close()
428 
429 # Update query dict entry with all values
430 result = {
431  "run_guid": run_guid,
432  "run_timestamp": run_timestamp,
433  "run_connection": run_connection,
434  "run_machine_name": run_machine_name,
435  "run_machine_uname": run_machine_uname,
436  "run_driver": run_driver,
437  "run_version": run_version,
438  "run_label": label,
439  "import_test_name": import_test_name,
440  "import_elapsed_time_ms": query_elapsed_time,
441  "import_execute_time_ms": execution_time,
442  "import_conn_time_ms": connect_time,
443  "rows_loaded": rows_loaded,
444  "rows_rejected": rows_rejected,
445 }
446 
447 
448 # Convert query list to json for outputs
449 result_json = json.dumps(result, default=json_format_handler, indent=2)
450 
451 # Send results
452 if "mapd_db" in destinations:
453  # Create dataframe from list of query results
454  logging.debug("Converting results list to pandas dataframe")
455  results_df = pandas.DataFrame(result, index=[0])
456  # Establish connection to destination mapd db
457  logging.debug("Connecting to destination mapd db")
458  dest_con = get_connection(
459  db_user=dest_db_user,
460  db_passwd=dest_db_passwd,
461  db_server=dest_db_server,
462  db_port=dest_db_port,
463  db_name=dest_db_name,
464  )
465  if not dest_con:
466  exit(1) # Exit if cannot connect to destination db
467  # Load results into db, creating table if it does not exist
468  tables = dest_con.get_tables()
469  if dest_table not in tables:
470  logging.info("Destination table does not exist. Creating.")
471  try:
472  with open(dest_table_schema_file, "r") as table_schema:
473  logging.debug(
474  "Reading table_schema_file: " + dest_table_schema_file
475  )
476  create_table_sql = table_schema.read().replace("\n", " ")
477  create_table_sql = create_table_sql.replace(
478  "##TAB##", dest_table
479  )
480  except FileNotFoundError:
481  logging.exception("Could not find table_schema_file.")
482  exit(1)
483  try:
484  logging.debug("Executing create destination table query")
485  res = dest_con.execute(create_table_sql)
486  logging.debug("Destination table created.")
487  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
488  logging.exception("Error running table creation")
489  exit(1)
490  logging.info("Loading results into destination db")
491  dest_con.load_table(
492  dest_table, results_df, method="columnar", create=False
493  )
494  dest_con.close()
495 if "file_json" in destinations:
496  # Write to json file
497  logging.debug("Opening json output file for writing")
498  file_json_open = open(output_file_json, "w")
499  logging.info("Writing to output json file: " + output_file_json)
500  file_json_open.write(result_json)
501 if "jenkins_bench" in destinations:
502  # Write output to file formatted for jenkins benchmark plugin
503  # https://github.com/jenkinsci/benchmark-plugin
504  logging.debug("Constructing output for jenkins benchmark plugin")
505  jenkins_bench_json = json.dumps(
506  {
507  "groups": [
508  {
509  "name": import_test_name,
510  "description": "Import: " + import_test_name,
511  "tests": [
512  {
513  "name": "import",
514  "description": "",
515  "parameters": [],
516  "results": [
517  {
518  "name": import_test_name + " average",
519  "description": "",
520  "unit": "ms",
521  "dblValue": execution_time,
522  }
523  ],
524  }
525  ],
526  }
527  ]
528  }
529  )
530  # Write to json file
531  logging.debug("Opening jenkins_bench json output file for writing")
532  file_jenkins_open = open(output_file_jenkins, "w")
533  logging.info("Writing to jenkins_bench json file: " + output_file_jenkins)
534  file_jenkins_open.write(jenkins_bench_json)
535 if "output" in destinations:
536  logging.info("Printing query results to output")
537  print(result_json)
std::string join(T const &container, std::string const &delim)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:83