OmniSciDB  04ee39c94c
run-benchmark.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pymapd
9 import re
10 from pandas import DataFrame
11 from argparse import ArgumentParser
12 
13 # For usage info, run: `./<script_name>.py --help`
14 
15 
16 def get_connection(**kwargs):
17  """
18  Connects to the db using pymapd
19  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
20 
21  Kwargs:
22  db_user(str): DB username
23  db_passwd(str): DB password
24  db_server(str): DB host
25  db_port(int): DB port
26  db_name(str): DB name
27 
28  Returns:
29  con(class): Connection class
30  False(bool): The connection failed. Exception should be logged.
31  """
32  try:
33  logging.debug("Connecting to mapd db...")
34  con = pymapd.connect(
35  user=kwargs["db_user"],
36  password=kwargs["db_passwd"],
37  host=kwargs["db_server"],
38  port=kwargs["db_port"],
39  dbname=kwargs["db_name"],
40  )
41  logging.info("Succesfully connected to mapd db")
42  return con
43  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
44  logging.exception("Error connecting to database.")
45  return False
46 
47 
48 def validate_query_file(**kwargs):
49  """
50  Validates query file. Currently only checks the query file name
51 
52  Kwargs:
53  query_filename(str): Name of query file
54 
55  Returns:
56  True(bool): Query succesfully validated
57  False(bool): Query failed validation
58  """
59  if not kwargs["query_filename"].endswith(".sql"):
60  logging.warning(
61  "Query filename "
62  + kwargs["query_filename"]
63  + ' is invalid - does not end in ".sql". Skipping'
64  )
65  return False
66  else:
67  return True
68 
69 
70 def execute_query(**kwargs):
71  """
72  Executes a query against the connected db using pymapd
73  https://pymapd.readthedocs.io/en/latest/usage.html#querying
74 
75  Kwargs:
76  query_name(str): Name of query
77  query_mapdql(str): Query to run
78  iteration(int): Iteration number
79 
80  Returns:
81  query_execution(dict):::
82  result_count(int): Number of results returned
83  execution_time(float): Time (in ms) that pymapd reports
84  backend spent on query.
85  connect_time(float): Time (in ms) for overhead of query, calculated
86  by subtracting backend execution time
87  from time spent on the execution function.
88  results_iter_time(float): Time (in ms) it took to for
89  pymapd.fetchone() to iterate through all
90  of the results.
91  total_time(float): Time (in ms) from adding all above times.
92  False(bool): The query failed. Exception should be logged.
93  """
94  start_time = timeit.default_timer()
95  try:
96  # Run the query
97  query_result = con.execute(kwargs["query_mapdql"])
98  logging.debug(
99  "Completed iteration "
100  + str(kwargs["iteration"])
101  + " of query "
102  + kwargs["query_name"]
103  )
104  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
105  logging.exception(
106  "Error running query "
107  + kwargs["query_name"]
108  + " during iteration "
109  + str(kwargs["iteration"])
110  )
111  return False
112 
113  # Calculate times
114  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
115  execution_time = query_result._result.execution_time_ms
116  connect_time = round((query_elapsed_time - execution_time), 1)
117 
118  # Iterate through each result from the query
119  logging.debug(
120  "Counting results from query"
121  + kwargs["query_name"]
122  + " iteration "
123  + str(kwargs["iteration"])
124  )
125  result_count = 0
126  start_time = timeit.default_timer()
127  while query_result.fetchone():
128  result_count += 1
129  results_iter_time = round(
130  ((timeit.default_timer() - start_time) * 1000), 1
131  )
132 
133  query_execution = {
134  "result_count": result_count,
135  "execution_time": execution_time,
136  "connect_time": connect_time,
137  "results_iter_time": results_iter_time,
138  "total_time": execution_time + connect_time + results_iter_time,
139  }
140  logging.debug(
141  "Execution results for query"
142  + kwargs["query_name"]
143  + " iteration "
144  + str(kwargs["iteration"])
145  + ": "
146  + str(query_execution)
147  )
148  return query_execution
149 
150 
151 def calculate_query_times(**kwargs):
152  """
153  Calculates aggregate query times from all iteration times
154 
155  Kwargs:
156  total_times(list): List of total time calculations
157  execution_times(list): List of execution_time calculations
158  results_iter_times(list): List of results_iter_time calculations
159  connect_times(list): List of connect_time calculations
160 
161  Returns:
162  query_execution(dict): Query times
163  False(bool): The query failed. Exception should be logged.
164  """
165  return {
166  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
167  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
168  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
169  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
170  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
171  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
172  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
173  "execution_time_85": round(
174  numpy.percentile(kwargs["execution_times"], 85), 1
175  ),
176  "execution_time_25": round(
177  numpy.percentile(kwargs["execution_times"], 25), 1
178  ),
179  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
180  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
181  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
182  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
183  "connect_time_85": round(
184  numpy.percentile(kwargs["connect_times"], 85), 1
185  ),
186  "results_iter_time_avg": round(
187  numpy.mean(kwargs["results_iter_times"]), 1
188  ),
189  "results_iter_time_min": round(
190  numpy.min(kwargs["results_iter_times"]), 1
191  ),
192  "results_iter_time_max": round(
193  numpy.max(kwargs["results_iter_times"]), 1
194  ),
195  "results_iter_time_85": round(
196  numpy.percentile(kwargs["results_iter_times"], 85), 1
197  ),
198  }
199 
200 
201 def get_mem_usage(**kwargs):
202  """
203  Calculates memory statistics from mapd_server _client.get_memory call
204 
205  Kwargs:
206  con(class 'pymapd.connection.Connection'): Mapd connection
207  mem_type(str): [gpu, cpu] Type of memory to gather metrics for
208 
209  Returns:
210  ramusage(dict):::
211  usedram(float): Amount of memory (in MB) used
212  freeram(float): Amount of memory (in MB) free
213  totalallocated(float): Total amount of memory (in MB) allocated
214  errormessage(str): Error if returned by get_memory call
215  rawdata(list): Raw data returned from get_memory call
216  """
217  try:
218  con_mem_data_list = con._client.get_memory(
219  session=kwargs["con"]._session, memory_level=kwargs["mem_type"]
220  )
221  usedram = 0
222  freeram = 0
223  for con_mem_data in con_mem_data_list:
224  page_size = con_mem_data.page_size
225  node_memory_data_list = con_mem_data.node_memory_data
226  for node_memory_data in node_memory_data_list:
227  ram = node_memory_data.num_pages * page_size
228  is_free = node_memory_data.is_free
229  if is_free:
230  freeram += ram
231  else:
232  usedram += ram
233  totalallocated = usedram + freeram
234  if totalallocated > 0:
235  totalallocated = round(totalallocated / 1024 / 1024, 1)
236  usedram = round(usedram / 1024 / 1024, 1)
237  freeram = round(freeram / 1024 / 1024, 1)
238  ramusage = {}
239  ramusage["usedram"] = usedram
240  ramusage["freeram"] = freeram
241  ramusage["totalallocated"] = totalallocated
242  ramusage["errormessage"] = ""
243  except Exception as e:
244  errormessage = "Get memory failed with error: " + str(e)
245  logging.error(errormessage)
246  ramusage["errormessage"] = errormessage
247  return ramusage
248 
249 
251  # Function to allow json to deal with datetime and numpy int
252  if isinstance(x, datetime.datetime):
253  return x.isoformat()
254  if isinstance(x, numpy.int64):
255  return int(x)
256  raise TypeError("Unknown type")
257 
258 
259 # Parse input parameters
260 parser = ArgumentParser()
261 optional = parser._action_groups.pop()
262 required = parser.add_argument_group("required arguments")
263 parser._action_groups.append(optional)
264 optional.add_argument(
265  "-v", "--verbose", action="store_true", help="Turn on debug logging"
266 )
267 optional.add_argument(
268  "-q",
269  "--quiet",
270  action="store_true",
271  help="Suppress script outuput " + "(except warnings and errors)",
272 )
273 required.add_argument(
274  "-u", "--user", dest="user", default="mapd", help="Source database user"
275 )
276 required.add_argument(
277  "-p",
278  "--passwd",
279  dest="passwd",
280  default="HyperInteractive",
281  help="Source database password",
282 )
283 required.add_argument(
284  "-s",
285  "--server",
286  dest="server",
287  default="localhost",
288  help="Source database server hostname",
289 )
290 optional.add_argument(
291  "-o",
292  "--port",
293  dest="port",
294  type=int,
295  default=6274,
296  help="Source database server port",
297 )
298 required.add_argument(
299  "-n", "--name", dest="name", default="mapd", help="Source database name"
300 )
301 required.add_argument(
302  "-t", "--table", dest="table", required=True, help="Source db table name"
303 )
304 required.add_argument(
305  "-l", "--label", dest="label", required=True, help="Benchmark run label"
306 )
307 required.add_argument(
308  "-d",
309  "--queries-dir",
310  dest="queries_dir",
311  help='Absolute path to dir with query files. \
312  [Default: "queries" dir in same location as script]',
313 )
314 required.add_argument(
315  "-i",
316  "--iterations",
317  dest="iterations",
318  type=int,
319  required=True,
320  help="Number of iterations per query. Must be > 1",
321 )
322 optional.add_argument(
323  "-g",
324  "--gpu-count",
325  dest="gpu_count",
326  type=int,
327  default=None,
328  help="Number of GPUs. Not required when gathering local gpu info",
329 )
330 optional.add_argument(
331  "-G",
332  "--gpu-name",
333  dest="gpu_name",
334  type=str,
335  default="",
336  help="Name of GPU(s). Not required when gathering local gpu info",
337 )
338 optional.add_argument(
339  "--no-gather-conn-gpu-info",
340  dest="no_gather_conn_gpu_info",
341  action="store_true",
342  help="Do not gather source database GPU info fields "
343  + "[run_gpu_count, run_gpu_mem_mb] "
344  + "using pymapd connection info. "
345  + "Use when testing a CPU-only server.",
346 )
347 optional.add_argument(
348  "--no-gather-nvml-gpu-info",
349  dest="no_gather_nvml_gpu_info",
350  action="store_true",
351  help="Do not gather source database GPU info fields "
352  + "[gpu_driver_ver, run_gpu_name] "
353  + "from local GPU using pynvml. "
354  + 'Defaults to True when source server is not "localhost". '
355  + "Use when testing a CPU-only server.",
356 )
357 optional.add_argument(
358  "--gather-nvml-gpu-info",
359  dest="gather_nvml_gpu_info",
360  action="store_true",
361  help="Gather source database GPU info fields "
362  + "[gpu_driver_ver, run_gpu_name] "
363  + "from local GPU using pynvml. "
364  + 'Defaults to True when source server is "localhost". '
365  + "Only use when benchmarking against same machine that this script is "
366  + "run from.",
367 )
368 optional.add_argument(
369  "-m", "--machine-name", dest="machine_name", help="Name of source machine"
370 )
371 optional.add_argument(
372  "-a",
373  "--machine-uname",
374  dest="machine_uname",
375  help="Uname info from " + "source machine",
376 )
377 optional.add_argument(
378  "-e",
379  "--destination",
380  dest="destination",
381  default="mapd_db",
382  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
383  + "Multiple values can be input seperated by commas, "
384  + 'ex: "mapd_db,file_json"',
385 )
386 optional.add_argument(
387  "-U",
388  "--dest-user",
389  dest="dest_user",
390  default="mapd",
391  help="Destination mapd_db database user",
392 )
393 optional.add_argument(
394  "-P",
395  "--dest-passwd",
396  dest="dest_passwd",
397  default="HyperInteractive",
398  help="Destination mapd_db database password",
399 )
400 optional.add_argument(
401  "-S",
402  "--dest-server",
403  dest="dest_server",
404  help="Destination mapd_db database server hostname"
405  + ' (required if destination = "mapd_db")',
406 )
407 optional.add_argument(
408  "-O",
409  "--dest-port",
410  dest="dest_port",
411  type=int,
412  default=6274,
413  help="Destination mapd_db database server port",
414 )
415 optional.add_argument(
416  "-N",
417  "--dest-name",
418  dest="dest_name",
419  default="mapd",
420  help="Destination mapd_db database name",
421 )
422 optional.add_argument(
423  "-T",
424  "--dest-table",
425  dest="dest_table",
426  default="results",
427  help="Destination mapd_db table name",
428 )
429 optional.add_argument(
430  "-C",
431  "--dest-table-schema-file",
432  dest="dest_table_schema_file",
433  default="results_table_schemas/query-results.sql",
434  help="Destination table schema file. This must be an executable CREATE "
435  + "TABLE statement that matches the output of this script. It is "
436  + "required when creating the results table. Default location is in "
437  + '"./results_table_schemas/query-results.sql"',
438 )
439 optional.add_argument(
440  "-j",
441  "--output-file-json",
442  dest="output_file_json",
443  help="Absolute path of .json output file "
444  + '(required if destination = "file_json")',
445 )
446 optional.add_argument(
447  "-J",
448  "--output-file-jenkins",
449  dest="output_file_jenkins",
450  help="Absolute path of jenkins benchmark .json output file "
451  + '(required if destination = "jenkins_bench")',
452 )
453 optional.add_argument(
454  "-E",
455  "--output-tag-jenkins",
456  dest="output_tag_jenkins",
457  default="",
458  help="Jenkins benchmark result tag. "
459  + 'Optional, appended to table name in "group" field',
460 )
461 args = parser.parse_args()
462 if args.verbose:
463  logging.basicConfig(level=logging.DEBUG)
464 elif args.quiet:
465  logging.basicConfig(level=logging.WARNING)
466 else:
467  logging.basicConfig(level=logging.INFO)
468 source_db_user = args.user
469 source_db_passwd = args.passwd
470 source_db_server = args.server
471 source_db_port = args.port
472 source_db_name = args.name
473 source_table = args.table
474 label = args.label
475 if args.queries_dir:
476  queries_dir = args.queries_dir
477 else:
478  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
479 iterations = int(args.iterations)
480 if (iterations > 1) is not True:
481  # Need > 1 iteration as first iteration is dropped from calculations
482  logging.error("Iterations must be greater than 1")
483  exit(1)
484 gpu_count = args.gpu_count
485 gpu_name = args.gpu_name
486 no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
487 gather_nvml_gpu_info = args.gather_nvml_gpu_info
488 no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
489 machine_name = args.machine_name
490 machine_uname = args.machine_uname
491 destinations = args.destination.split(",")
492 if "mapd_db" in destinations:
493  valid_destination_set = True
494  dest_db_user = args.dest_user
495  dest_db_passwd = args.dest_passwd
496  if args.dest_server is None:
497  # If dest_server is not set for mapd_db, then exit
498  logging.error('"dest_server" is required when destination = "mapd_db"')
499  exit(1)
500  else:
501  dest_db_server = args.dest_server
502  dest_db_port = args.dest_port
503  dest_db_name = args.dest_name
504  dest_table = args.dest_table
505  dest_table_schema_file = args.dest_table_schema_file
506 if "file_json" in destinations:
507  valid_destination_set = True
508  if args.output_file_json is None:
509  # If output_file_json is not set for file_json, then exit
510  logging.error(
511  '"output_file_json" is required when destination = "file_json"'
512  )
513  exit(1)
514  else:
515  output_file_json = args.output_file_json
516 if "output" in destinations:
517  valid_destination_set = True
518 if "jenkins_bench" in destinations:
519  valid_destination_set = True
520  if args.output_file_jenkins is None:
521  # If output_file_jenkins is not set for jenkins_bench, then exit
522  logging.error(
523  '"output_file_jenkins" is required '
524  + 'when destination = "jenkins_bench"'
525  )
526  exit(1)
527  else:
528  output_file_jenkins = args.output_file_jenkins
529 output_tag_jenkins = args.output_tag_jenkins
530 if not valid_destination_set:
531  logging.error("No valid destination(s) have been set. Exiting.")
532  exit(1)
533 
534 
535 # Establish connection to mapd db
537  db_user=source_db_user,
538  db_passwd=source_db_passwd,
539  db_server=source_db_server,
540  db_port=source_db_port,
541  db_name=source_db_name,
542 )
543 if not con:
544  exit(1) # Exit if cannot connect to db
545 
546 # Set run vars
547 run_guid = str(uuid.uuid4())
548 logging.debug("Run guid: " + run_guid)
549 run_timestamp = datetime.datetime.now()
550 run_connection = str(con)
551 logging.debug("Connection string: " + run_connection)
552 run_driver = "" # TODO
553 run_version = con._client.get_version()
554 if "-" in run_version:
555  run_version_short = run_version.split("-")[0]
556 else:
557  run_version_short = run_version
558 conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
559 # Set GPU info fields
560 conn_gpu_count = None
561 source_db_gpu_count = None
562 source_db_gpu_mem = None
563 source_db_gpu_driver_ver = ""
564 source_db_gpu_name = ""
565 if no_gather_conn_gpu_info:
566  logging.debug(
567  "--no-gather-conn-gpu-info passed, "
568  + "using blank values for source database GPU info fields "
569  + "[run_gpu_count, run_gpu_mem_mb] "
570  )
571 else:
572  logging.debug(
573  "Gathering source database GPU info fields "
574  + "[run_gpu_count, run_gpu_mem_mb] "
575  + "using pymapd connection info. "
576  )
577  conn_hardware_info = con._client.get_hardware_info(con._session)
578  conn_gpu_count = conn_hardware_info.hardware_info[0].num_gpu_allocated
579 if conn_gpu_count == 0 or conn_gpu_count is None:
580  no_gather_nvml_gpu_info = True
581  if conn_gpu_count == 0:
582  logging.warning(
583  "0 GPUs detected from connection info, "
584  + "using blank values for source database GPU info fields "
585  + "If running against cpu-only server, make sure to set "
586  + "--no-gather-nvml-gpu-info and --no-gather-conn-gpu-info."
587  )
588 else:
589  source_db_gpu_count = conn_gpu_count
590  try:
591  source_db_gpu_mem = int(
592  conn_hardware_info.hardware_info[0].gpu_info[0].memory / 1000000
593  )
594  except IndexError:
595  logging.error("GPU memory info not available from connection.")
596 if no_gather_nvml_gpu_info:
597  logging.debug(
598  "--no-gather-nvml-gpu-info passed, "
599  + "using blank values for source database GPU info fields "
600  + "[gpu_driver_ver, run_gpu_name] "
601  )
602 elif conn_machine_name == "localhost" or gather_nvml_gpu_info:
603  logging.debug(
604  "Gathering source database GPU info fields "
605  + "[gpu_driver_ver, run_gpu_name] "
606  + "from local GPU using pynvml. "
607  )
608  import pynvml
609 
610  pynvml.nvmlInit()
611  source_db_gpu_driver_ver = pynvml.nvmlSystemGetDriverVersion().decode()
612  for i in range(source_db_gpu_count):
613  handle = pynvml.nvmlDeviceGetHandleByIndex(i)
614  # Assume all cards are the same, overwrite name value
615  source_db_gpu_name = pynvml.nvmlDeviceGetName(handle).decode()
616  pynvml.nvmlShutdown()
617 # If gpu_count argument passed in, override gathered value
618 if gpu_count:
619  source_db_gpu_count = gpu_count
620 # Set machine names, using local info if connected to localhost
621 if conn_machine_name == "localhost":
622  local_uname = os.uname()
623 if machine_name:
624  run_machine_name = machine_name
625 else:
626  if conn_machine_name == "localhost":
627  run_machine_name = local_uname.nodename.split(".")[0]
628  else:
629  run_machine_name = conn_machine_name
630 if machine_uname:
631  run_machine_uname = machine_uname
632 else:
633  if conn_machine_name == "localhost":
634  run_machine_uname = " ".join(local_uname)
635  else:
636  run_machine_uname = ""
637 
638 # Read query files contents and write to query_list
639 query_list = []
640 logging.debug("Queries dir: " + queries_dir)
641 try:
642  for query_filename in os.listdir(queries_dir):
643  logging.debug("Validating query filename: " + query_filename)
644  if validate_query_file(query_filename=query_filename):
645  with open(
646  queries_dir + "/" + query_filename, "r"
647  ) as query_filepath:
648  logging.debug("Reading query with filename: " + query_filename)
649  query_mapdql = query_filepath.read().replace("\n", " ")
650  query_mapdql = query_mapdql.replace("##TAB##", source_table)
651  query_list.append(
652  {"name": query_filename, "mapdql": query_mapdql}
653  )
654  logging.info("Read all query files")
655 except FileNotFoundError:
656  logging.exception("Could not find queries directory.")
657  exit(1) # Exit if cannot get queries dir
658 
659 # Run queries
660 for query in query_list:
661  # Set additional query vars
662  # Query ID = filename without extention
663  query_id = query["name"].rsplit(".")[0]
664 
665  # Run iterations of query
666  query_results = []
667  logging.info(
668  "Running query: " + query["name"] + " iterations: " + str(iterations)
669  )
670  query_total_start_time = timeit.default_timer()
671  for iteration in range(iterations):
672  # Gather memory before running query iteration
673  logging.debug("Getting pre-query memory usage on CPU")
674  pre_query_cpu_mem_usage = get_mem_usage(con=con, mem_type="cpu")
675  logging.debug("Getting pre-query memory usage on GPU")
676  pre_query_gpu_mem_usage = get_mem_usage(con=con, mem_type="gpu")
677  # Run query iteration
678  logging.debug(
679  "Running iteration "
680  + str(iteration)
681  + " of query "
682  + query["name"]
683  )
684  query_result = execute_query(
685  query_name=query["name"],
686  query_mapdql=query["mapdql"],
687  iteration=iteration,
688  )
689  # Gather memory after running query iteration
690  logging.debug("Getting post-query memory usage on CPU")
691  post_query_cpu_mem_usage = get_mem_usage(con=con, mem_type="cpu")
692  logging.debug("Getting post-query memory usage on GPU")
693  post_query_gpu_mem_usage = get_mem_usage(con=con, mem_type="gpu")
694  # Calculate total (post minus pre) memory usage after query iteration
695  query_cpu_mem_usage = round(
696  post_query_cpu_mem_usage["usedram"]
697  - pre_query_cpu_mem_usage["usedram"],
698  1,
699  )
700  query_gpu_mem_usage = round(
701  post_query_gpu_mem_usage["usedram"]
702  - pre_query_gpu_mem_usage["usedram"],
703  1,
704  )
705  if query_result:
706  query.update({"succeeded": True})
707  query_error_info = "" # TODO - interpret query error info
708  # Assign first query iteration times
709  if iteration == 0:
710  first_execution_time = round(query_result["execution_time"], 1)
711  first_connect_time = round(query_result["connect_time"], 1)
712  first_results_iter_time = round(
713  query_result["results_iter_time"], 1
714  )
715  first_total_time = (
716  first_execution_time
717  + first_connect_time
718  + first_results_iter_time
719  )
720  first_cpu_mem_usage = query_cpu_mem_usage
721  first_gpu_mem_usage = query_gpu_mem_usage
722  else:
723  # Put noninitial iterations into query_result list
724  query_results.append(query_result)
725  # Verify no change in memory for noninitial iterations
726  if query_cpu_mem_usage != 0.0:
727  logging.error(
728  (
729  "Noninitial iteration ({0}) of query ({1}) "
730  + "shows non-zero CPU memory usage: {2}"
731  ).format(iteration, query["name"], query_cpu_mem_usage)
732  )
733  if query_gpu_mem_usage != 0.0:
734  logging.error(
735  (
736  "Noninitial iteration ({0}) of query ({1}) "
737  + "shows non-zero GPU memory usage: {2}"
738  ).format(iteration, query["name"], query_gpu_mem_usage)
739  )
740  else:
741  query.update({"succeeded": False})
742  logging.warning(
743  "Error detected during execution of query: "
744  + query["name"]
745  + ". This query will be skipped and "
746  + "times will not reported"
747  )
748  if query["succeeded"] is False:
749  # Do not run any more iterations of the failed query
750  break
751  if query["succeeded"] is False:
752  # Do not calculate results for the failed query, move on to the next
753  continue
754 
755  # Calculate time for all iterations to run
756  query_total_elapsed_time = round(
757  ((timeit.default_timer() - query_total_start_time) * 1000), 1
758  )
759  logging.info("Completed all iterations of query " + query["name"])
760 
761  # Aggregate iteration values
762  execution_times, connect_times, results_iter_times, total_times = (
763  [],
764  [],
765  [],
766  [],
767  )
768  for query_result in query_results:
769  execution_times.append(query_result["execution_time"])
770  connect_times.append(query_result["connect_time"])
771  results_iter_times.append(query_result["results_iter_time"])
772  total_times.append(query_result["total_time"])
773  # Overwrite result count, since should be the same for each iteration
774  result_count = query_result["result_count"]
775 
776  # Calculate query times
777  logging.debug("Calculating times from query " + query["name"])
778  query_times = calculate_query_times(
779  total_times=total_times,
780  execution_times=execution_times,
781  connect_times=connect_times,
782  results_iter_times=results_iter_times,
783  )
784 
785  # Update query dict entry with all values
786  query.update(
787  {
788  "results": {
789  "run_guid": run_guid,
790  "run_timestamp": run_timestamp,
791  "run_connection": run_connection,
792  "run_machine_name": run_machine_name,
793  "run_machine_uname": run_machine_uname,
794  "run_driver": run_driver,
795  "run_version": run_version,
796  "run_version_short": run_version_short,
797  "run_label": label,
798  "run_gpu_count": source_db_gpu_count,
799  "run_gpu_driver_ver": source_db_gpu_driver_ver,
800  "run_gpu_name": source_db_gpu_name,
801  "run_gpu_mem_mb": source_db_gpu_mem,
802  "run_table": source_table,
803  "query_id": query_id,
804  "query_result_set_count": result_count,
805  "query_error_info": query_error_info,
806  "query_conn_first": first_connect_time,
807  "query_conn_avg": query_times["connect_time_avg"],
808  "query_conn_min": query_times["connect_time_min"],
809  "query_conn_max": query_times["connect_time_max"],
810  "query_conn_85": query_times["connect_time_85"],
811  "query_exec_first": first_execution_time,
812  "query_exec_avg": query_times["execution_time_avg"],
813  "query_exec_min": query_times["execution_time_min"],
814  "query_exec_max": query_times["execution_time_max"],
815  "query_exec_85": query_times["execution_time_85"],
816  "query_exec_25": query_times["execution_time_25"],
817  "query_exec_stdd": query_times["execution_time_std"],
818  # Render queries not supported yet
819  "query_render_first": None,
820  "query_render_avg": None,
821  "query_render_min": None,
822  "query_render_max": None,
823  "query_render_85": None,
824  "query_render_25": None,
825  "query_render_stdd": None,
826  "query_total_first": first_total_time,
827  "query_total_avg": query_times["total_time_avg"],
828  "query_total_min": query_times["total_time_min"],
829  "query_total_max": query_times["total_time_max"],
830  "query_total_85": query_times["total_time_85"],
831  "query_total_all": query_total_elapsed_time,
832  "results_iter_count": iterations,
833  "results_iter_first": first_results_iter_time,
834  "results_iter_avg": query_times["results_iter_time_avg"],
835  "results_iter_min": query_times["results_iter_time_min"],
836  "results_iter_max": query_times["results_iter_time_max"],
837  "results_iter_85": query_times["results_iter_time_85"],
838  "cpu_mem_usage_mb": first_cpu_mem_usage,
839  "gpu_mem_usage_mb": first_gpu_mem_usage,
840  }
841  }
842  )
843  logging.debug(
844  "All values set for query " + query["name"] + ": " + str(query)
845  )
846 logging.debug("Closing source db connection.")
847 con.close()
848 logging.info("Completed all queries.")
849 
850 
851 # Create list of successful queries
852 logging.debug(
853  "Removing failed queries from results going to destination db(s)"
854 )
855 succesful_query_list = query_list
856 for index, query in enumerate(succesful_query_list):
857  if query["succeeded"] is False:
858  del succesful_query_list[index]
859 # Create successful query results list for upload to destination(s)
860 query_results = []
861 for query in succesful_query_list:
862  query_results.append(query["results"])
863 # Convert query list to json for outputs
864 query_list_json = json.dumps(query_list, default=json_format_handler, indent=2)
865 
866 # Send results
867 if "mapd_db" in destinations:
868  # Create dataframe from list of query results
869  logging.debug("Converting results list to pandas dataframe")
870  results_df = DataFrame(query_results)
871  # Establish connection to destination mapd db
872  logging.debug("Connecting to destination mapd db")
873  dest_con = get_connection(
874  db_user=dest_db_user,
875  db_passwd=dest_db_passwd,
876  db_server=dest_db_server,
877  db_port=dest_db_port,
878  db_name=dest_db_name,
879  )
880  if not dest_con:
881  exit(1) # Exit if cannot connect to destination db
882  # Load results into db, creating table if it does not exist
883  tables = dest_con.get_tables()
884  if dest_table not in tables:
885  logging.info("Destination table does not exist. Creating.")
886  try:
887  with open(dest_table_schema_file, "r") as table_schema:
888  logging.debug(
889  "Reading table_schema_file: " + dest_table_schema_file
890  )
891  create_table_sql = table_schema.read().replace("\n", " ")
892  create_table_sql = create_table_sql.replace(
893  "##TAB##", dest_table
894  )
895  except FileNotFoundError:
896  logging.exception("Could not find table_schema_file.")
897  exit(1)
898  try:
899  logging.debug("Executing create destination table query")
900  res = dest_con.execute(create_table_sql)
901  logging.debug("Destination table created.")
902  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
903  logging.exception("Error running table creation")
904  exit(1)
905  logging.info("Loading results into destination db")
906  dest_con.load_table_columnar(
907  dest_table,
908  results_df,
909  preserve_index=False,
910  chunk_size_bytes=0,
911  col_names_from_schema=True,
912  )
913  dest_con.close()
914 if "file_json" in destinations:
915  # Write to json file
916  logging.debug("Opening json output file for writing")
917  file_json_open = open(output_file_json, "w")
918  logging.info("Writing to output json file: " + output_file_json)
919  file_json_open.write(query_list_json)
920 if "jenkins_bench" in destinations:
921  # Write output to file formatted for jenkins benchmark plugin
922  # https://github.com/jenkinsci/benchmark-plugin
923  jenkins_bench_results = []
924  for query_result in query_results:
925  logging.debug("Constructing output for jenkins benchmark plugin")
926  jenkins_bench_results.append(
927  {
928  "name": query_result["query_id"],
929  "description": "",
930  "parameters": [],
931  "results": [
932  {
933  "name": query_result["query_id"] + " average",
934  "description": "",
935  "unit": "ms",
936  "dblValue": query_result["query_exec_avg"],
937  }
938  ],
939  }
940  )
941  jenkins_bench_json = json.dumps(
942  {
943  "groups": [
944  {
945  "name": source_table + output_tag_jenkins,
946  "description": "Source table: " + source_table,
947  "tests": jenkins_bench_results,
948  }
949  ]
950  }
951  )
952  # Write to json file
953  logging.debug("Opening jenkins_bench json output file for writing")
954  file_jenkins_open = open(output_file_jenkins, "w")
955  logging.info("Writing to jenkins_bench json file: " + output_file_jenkins)
956  file_jenkins_open.write(jenkins_bench_json)
957 if "output" in destinations:
958  logging.info("Printing query results to output")
959  print(query_list_json)
960 
961 logging.info("Succesfully loaded query results info into destination(s)")
def execute_query(kwargs)
std::string join(T const &container, std::string const &delim)
def get_mem_usage(kwargs)
def get_connection(kwargs)
def calculate_query_times(kwargs)
def validate_query_file(kwargs)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:83
def json_format_handler(x)