OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
run_benchmark_import.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import pymapd
8 import pandas
9 import re
10 from argparse import ArgumentParser
11 
12 # For usage info, run: `./<script_name>.py --help`
13 
14 
15 def get_connection(**kwargs):
16  """
17  Connects to the db using pymapd
18  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
19 
20  Kwargs:
21  db_user(str): DB username
22  db_passwd(str): DB password
23  db_server(str): DB host
24  db_port(int): DB port
25  db_name(str): DB name
26 
27  Returns:
28  con(class): Connection class
29  False(bool): The connection failed. Exception should be logged.
30  """
31  try:
32  logging.debug("Connecting to mapd db...")
33  con = pymapd.connect(
34  user=kwargs["db_user"],
35  password=kwargs["db_passwd"],
36  host=kwargs["db_server"],
37  port=kwargs["db_port"],
38  dbname=kwargs["db_name"],
39  )
40  logging.info("Succesfully connected to mapd db")
41  return con
42  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
43  logging.exception("Error connecting to database.")
44  return False
45 
46 
48  # Function to allow json to deal with datetime and numpy int
49  if isinstance(x, datetime.datetime):
50  return x.isoformat()
51  raise TypeError("Unknown type")
52 
53 
54 # Parse input parameters
55 parser = ArgumentParser()
56 optional = parser._action_groups.pop()
57 required = parser.add_argument_group("required arguments")
58 parser._action_groups.append(optional)
59 optional.add_argument(
60  "-v", "--verbose", action="store_true", help="Turn on debug logging"
61 )
62 optional.add_argument(
63  "-q",
64  "--quiet",
65  action="store_true",
66  help="Suppress script outuput " + "(except warnings and errors)",
67 )
68 required.add_argument(
69  "-u", "--user", dest="user", default="mapd", help="Source database user"
70 )
71 required.add_argument(
72  "-p",
73  "--passwd",
74  dest="passwd",
75  default="HyperInteractive",
76  help="Source database password",
77 )
78 required.add_argument(
79  "-s",
80  "--server",
81  dest="server",
82  default="localhost",
83  help="Source database server hostname",
84 )
85 optional.add_argument(
86  "-o",
87  "--port",
88  dest="port",
89  type=int,
90  default=6274,
91  help="Source database server port",
92 )
93 required.add_argument(
94  "-n", "--name", dest="name", default="mapd", help="Source database name"
95 )
96 required.add_argument(
97  "-l", "--label", dest="label", required=True, help="Benchmark run label"
98 )
99 required.add_argument(
100  "-f",
101  "--import-file",
102  dest="import_file",
103  required=True,
104  help="Absolute path to file on heavydb machine with data for "
105  + "import test",
106 )
107 required.add_argument(
108  "-c",
109  "--table-schema-file",
110  dest="table_schema_file",
111  required=True,
112  help="Path to local file with CREATE TABLE sql statement for the "
113  + "import table",
114 )
115 optional.add_argument(
116  "-t",
117  "--import-table-name",
118  dest="import_table_name",
119  default="import_benchmark_test",
120  help="Name of table to import data to. NOTE: This table will be dropped "
121  + "before and after the import test, unless "
122  + "--no-drop-table-[before/after] is specified.",
123 )
124 optional.add_argument(
125  "-F",
126  "--import-query-template-file",
127  dest="import_query_template_file",
128  help="Path to file containing template for import query. "
129  + 'The script will replace "##TAB##" with the value of import_table_name '
130  + 'and "##FILE##" with the value of table_schema_file. By default, the '
131  + "script will use the COPY FROM command with the default default "
132  + "delimiter (,).",
133 )
134 optional.add_argument(
135  "--no-drop-table-before",
136  dest="no_drop_table_before",
137  action="store_true",
138  help="Do not drop the import table and recreate it before import "
139  + "NOTE: Make sure existing table schema matches import .csv file schema",
140 )
141 optional.add_argument(
142  "--no-drop-table-after",
143  dest="no_drop_table_after",
144  action="store_true",
145  help="Do not drop the import table after import",
146 )
147 optional.add_argument(
148  "-A",
149  "--import-test-name",
150  dest="import_test_name",
151  help='Name of import test (ex: "ips"). Required when using '
152  + "jenkins_bench_json as output.",
153 )
154 optional.add_argument(
155  "-m", "--machine-name", dest="machine_name", help="Name of source machine"
156 )
157 optional.add_argument(
158  "-a",
159  "--machine-uname",
160  dest="machine_uname",
161  help="Uname info from " + "source machine",
162 )
163 optional.add_argument(
164  "-e",
165  "--destination",
166  dest="destination",
167  default="mapd_db",
168  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
169  + "Multiple values can be input seperated by commas, "
170  + 'ex: "mapd_db,file_json"',
171 )
172 optional.add_argument(
173  "-U",
174  "--dest-user",
175  dest="dest_user",
176  default="mapd",
177  help="Destination mapd_db database user",
178 )
179 optional.add_argument(
180  "-P",
181  "--dest-passwd",
182  dest="dest_passwd",
183  default="HyperInteractive",
184  help="Destination mapd_db database password",
185 )
186 optional.add_argument(
187  "-S",
188  "--dest-server",
189  dest="dest_server",
190  help="Destination mapd_db database server hostname"
191  + ' (required if destination = "mapd_db")',
192 )
193 optional.add_argument(
194  "-O",
195  "--dest-port",
196  dest="dest_port",
197  type=int,
198  default=6274,
199  help="Destination mapd_db database server port",
200 )
201 optional.add_argument(
202  "-N",
203  "--dest-name",
204  dest="dest_name",
205  default="mapd",
206  help="Destination mapd_db database name",
207 )
208 optional.add_argument(
209  "-T",
210  "--dest-table",
211  dest="dest_table",
212  default="import_results",
213  help="Destination mapd_db table name",
214 )
215 optional.add_argument(
216  "-C",
217  "--dest-table-schema-file",
218  dest="dest_table_schema_file",
219  default="results_table_schemas/import-results.sql",
220  help="Destination table schema file. This must be an executable CREATE "
221  + "TABLE statement that matches the output of this script. It is "
222  + "required when creating the results table. Default location is in "
223  + '"./results_table_schemas/query-results.sql"',
224 )
225 optional.add_argument(
226  "-j",
227  "--output-file-json",
228  dest="output_file_json",
229  help="Absolute path of .json output file "
230  + '(required if destination = "file_json")',
231 )
232 optional.add_argument(
233  "-J",
234  "--output-file-jenkins",
235  dest="output_file_jenkins",
236  help="Absolute path of jenkins benchmark .json output file "
237  + '(required if destination = "jenkins_bench")',
238 )
239 optional.add_argument(
240  "--fragment-size",
241  dest="fragment_size",
242  default="32000000",
243  help="Fragment size to be used to create table. File specified in -c"
244  + " is modified to replace ##FRAGMENT_SIZE## with value specified for"
245  + " this parameter."
246 )
247 args = parser.parse_args()
248 if args.verbose:
249  logging.basicConfig(level=logging.DEBUG)
250 elif args.quiet:
251  logging.basicConfig(level=logging.WARNING)
252 else:
253  logging.basicConfig(level=logging.INFO)
254 source_db_user = args.user
255 source_db_passwd = args.passwd
256 source_db_server = args.server
257 source_db_port = args.port
258 source_db_name = args.name
259 label = args.label
260 import_file = args.import_file
261 table_schema_file = args.table_schema_file
262 import_table_name = args.import_table_name
263 import_query_template_file = args.import_query_template_file
264 no_drop_table_before = args.no_drop_table_before
265 no_drop_table_after = args.no_drop_table_after
266 import_test_name = args.import_test_name
267 machine_name = args.machine_name
268 machine_uname = args.machine_uname
269 destinations = args.destination.split(",")
270 if "mapd_db" in destinations:
271  valid_destination_set = True
272  dest_db_user = args.dest_user
273  dest_db_passwd = args.dest_passwd
274  if args.dest_server is None:
275  # If dest_server is not set for mapd_db, then exit
276  logging.error('"dest_server" is required when destination = "mapd_db"')
277  exit(1)
278  else:
279  dest_db_server = args.dest_server
280  dest_db_port = args.dest_port
281  dest_db_name = args.dest_name
282  dest_table = args.dest_table
283  dest_table_schema_file = args.dest_table_schema_file
284 if "file_json" in destinations:
285  valid_destination_set = True
286  if args.output_file_json is None:
287  # If output_file_json is not set for file_json, then exit
288  logging.error(
289  '"output_file_json" is required when destination = "file_json"'
290  )
291  exit(1)
292  else:
293  output_file_json = args.output_file_json
294 if "output" in destinations:
295  valid_destination_set = True
296 if "jenkins_bench" in destinations:
297  valid_destination_set = True
298  if args.output_file_jenkins is None:
299  # If output_file_jenkins is not set for jenkins_bench, then exit
300  logging.error(
301  '"output_file_jenkins" is required '
302  + 'when destination = "jenkins_bench"'
303  )
304  exit(1)
305  elif args.import_test_name is None:
306  # If import_test_name is not set for jenkins_bench, then exit
307  logging.error(
308  '"import_test_name" is required '
309  + 'when destination = "jenkins_bench"'
310  )
311  exit(1)
312  else:
313  output_file_jenkins = args.output_file_jenkins
314 if not valid_destination_set:
315  logging.error("No valid destination(s) have been set. Exiting.")
316  exit(1)
317 
318 
319 # Establish connection to mapd db
321  db_user=source_db_user,
322  db_passwd=source_db_passwd,
323  db_server=source_db_server,
324  db_port=source_db_port,
325  db_name=source_db_name,
326 )
327 if not con:
328  exit(1) # Exit if cannot connect to db
329 
330 # Set run vars
331 run_guid = str(uuid.uuid4())
332 logging.debug("Run guid: " + run_guid)
333 run_timestamp = datetime.datetime.now()
334 run_connection = str(con)
335 logging.debug("Connection string: " + run_connection)
336 run_driver = "" # TODO
337 run_version = con._client.get_version()
338 if "-" in run_version:
339  run_version_short = run_version.split("-")[0]
340 else:
341  run_version_short = run_version
342 conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
343 # Set machine names, using local info if connected to localhost
344 if conn_machine_name == "localhost":
345  local_uname = os.uname()
346 if machine_name:
347  run_machine_name = machine_name
348 else:
349  if conn_machine_name == "localhost":
350  run_machine_name = local_uname.nodename
351  else:
352  run_machine_name = conn_machine_name
353 if machine_uname:
354  run_machine_uname = machine_uname
355 else:
356  if conn_machine_name == "localhost":
357  run_machine_uname = " ".join(local_uname)
358  else:
359  run_machine_uname = ""
360 
361 # See if import table exists, delete and/or create
362 if not no_drop_table_before:
363  logging.info("Dropping import table if exists")
364  con.execute("drop table if exists " + import_table_name)
365  logging.debug("Creating import table.")
366  try:
367  with open(table_schema_file, "r") as table_schema:
368  logging.debug("Reading table_schema_file: " + table_schema_file)
369  create_table_sql = table_schema.read().replace("\n", " ")
370  create_table_sql = create_table_sql.replace(
371  "##TAB##", import_table_name
372  )
373  create_table_sql = create_table_sql.replace(
374  "##FRAGMENT_SIZE##", args.fragment_size
375  )
376  except FileNotFoundError:
377  logging.exception("Could not find table_schema_file.")
378  exit(1)
379  try:
380  logging.debug("Creating import table...")
381  res = con.execute(create_table_sql)
382  logging.debug("Import table created.")
383  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
384  logging.exception("Error running table creation")
385  exit(1)
386 
387 
388 # Run the import query
389 if import_query_template_file:
390  try:
391  with open(import_query_template_file, "r") as import_query_template:
392  logging.debug(
393  "Reading import_query_template_file: " + import_query_template_file
394  )
395  import_query = import_query_template.read().replace("\n", " ")
396  import_query = import_query.replace(
397  "##TAB##", import_table_name
398  )
399  import_query = import_query.replace(
400  "##FILE##", import_file
401  )
402  except FileNotFoundError:
403  logging.exception("Could not find import_query_template_file.")
404  exit(1)
405 else:
406  import_query = "COPY %s FROM '%s';" % (import_table_name, import_file)
407 logging.debug("Import query: " + import_query)
408 logging.info("Starting import...")
409 start_time = timeit.default_timer()
410 try:
411  res = con.execute(import_query)
412  end_time = timeit.default_timer()
413  logging.info("Completed import.")
414 except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
415  logging.exception("Error running import query")
416  if no_drop_table_before:
417  logging.error(
418  'Import failed and "--no-drop-table-before" was '
419  + "passed in. Make sure existing table schema matches "
420  + "import .csv file schema."
421  )
422  exit(1)
423 
424 # Calculate times and results values
425 query_elapsed_time = round(((end_time - start_time) * 1000), 1)
426 execution_time = res._result.execution_time_ms
427 connect_time = round((query_elapsed_time - execution_time), 2)
428 res_output = str(res.fetchall()[0])
429 logging.debug("Query result output: " + res_output)
430 rows_loaded = re.search(r"Loaded: (.*?) recs, R", res_output).group(1)
431 rows_rejected = re.search(r"Rejected: (.*?) recs i", res_output).group(1)
432 
433 # Done with import. Dropping table and closing connection
434 if not no_drop_table_after:
435  logging.debug("Dropping import table")
436  con.execute("drop table " + import_table_name)
437 logging.debug("Closing source db connection.")
438 con.close()
439 
440 # Update query dict entry with all values
441 result = {
442  "run_guid": run_guid,
443  "run_timestamp": run_timestamp,
444  "run_connection": run_connection,
445  "run_machine_name": run_machine_name,
446  "run_machine_uname": run_machine_uname,
447  "run_driver": run_driver,
448  "run_version": run_version,
449  "run_label": label,
450  "import_test_name": import_test_name,
451  "import_elapsed_time_ms": query_elapsed_time,
452  "import_execute_time_ms": execution_time,
453  "import_conn_time_ms": connect_time,
454  "rows_loaded": rows_loaded,
455  "rows_rejected": rows_rejected,
456 }
457 
458 
459 # Convert query list to json for outputs
460 result_json = json.dumps(result, default=json_format_handler, indent=2)
461 
462 # Send results
463 if "mapd_db" in destinations:
464  # Create dataframe from list of query results
465  logging.debug("Converting results list to pandas dataframe")
466  results_df = pandas.DataFrame(result, index=[0])
467  # Establish connection to destination mapd db
468  logging.debug("Connecting to destination mapd db")
469  dest_con = get_connection(
470  db_user=dest_db_user,
471  db_passwd=dest_db_passwd,
472  db_server=dest_db_server,
473  db_port=dest_db_port,
474  db_name=dest_db_name,
475  )
476  if not dest_con:
477  exit(1) # Exit if cannot connect to destination db
478  # Load results into db, creating table if it does not exist
479  tables = dest_con.get_tables()
480  if dest_table not in tables:
481  logging.info("Destination table does not exist. Creating.")
482  try:
483  with open(dest_table_schema_file, "r") as table_schema:
484  logging.debug(
485  "Reading table_schema_file: " + dest_table_schema_file
486  )
487  create_table_sql = table_schema.read().replace("\n", " ")
488  create_table_sql = create_table_sql.replace(
489  "##TAB##", dest_table
490  )
491  except FileNotFoundError:
492  logging.exception("Could not find table_schema_file.")
493  exit(1)
494  try:
495  logging.debug("Executing create destination table query")
496  res = dest_con.execute(create_table_sql)
497  logging.debug("Destination table created.")
498  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
499  logging.exception("Error running table creation")
500  exit(1)
501  logging.info("Loading results into destination db")
502  dest_con.load_table(
503  dest_table, results_df, method="columnar", create=False
504  )
505  dest_con.close()
506 if "file_json" in destinations:
507  # Write to json file
508  logging.debug("Opening json output file for writing")
509  file_json_open = open(output_file_json, "w")
510  logging.info("Writing to output json file: " + output_file_json)
511  file_json_open.write(result_json)
512 if "jenkins_bench" in destinations:
513  # Write output to file formatted for jenkins benchmark plugin
514  # https://github.com/jenkinsci/benchmark-plugin
515  logging.debug("Constructing output for jenkins benchmark plugin")
516  jenkins_bench_json = json.dumps(
517  {
518  "groups": [
519  {
520  "name": import_test_name,
521  "description": "Import: " + import_test_name,
522  "tests": [
523  {
524  "name": "import",
525  "description": "",
526  "parameters": [],
527  "results": [
528  {
529  "name": import_test_name + " average",
530  "description": "",
531  "unit": "ms",
532  "dblValue": execution_time,
533  }
534  ],
535  }
536  ],
537  }
538  ]
539  }
540  )
541  # Write to json file
542  logging.debug("Opening jenkins_bench json output file for writing")
543  file_jenkins_open = open(output_file_jenkins, "w")
544  logging.info("Writing to jenkins_bench json file: " + output_file_jenkins)
545  file_jenkins_open.write(jenkins_bench_json)
546 if "output" in destinations:
547  logging.info("Printing query results to output")
548  print(result_json)
std::string join(T const &container, std::string const &delim)
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66