OmniSciDB  06b3bd477c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
run_benchmark.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pymapd
9 import re
10 import sys
11 from pandas import DataFrame
12 from argparse import ArgumentParser
13 
14 # For usage info, run: `./<script_name>.py --help`
15 
16 
17 def verify_destinations(**kwargs):
18  """
19  Verify script output destination(s)
20 
21  Kwargs:
22  destinations (list): List of destinations
23  dest_db_server (str): DB output destination server
24  output_file_json (str): Location of .json file output
25  output_file_jenkins (str): Location of .json jenkins file output
26 
27  Returns:
28  True(bool): Destination(s) is/are valid
29  False(bool): Destination(s) is/are not valid
30  """
31  if "mapd_db" in kwargs["destinations"]:
32  valid_destination_set = True
33  if kwargs["dest_db_server"] is None:
34  # If dest_server is not set for mapd_db, then exit
35  logging.error(
36  '"dest_server" is required when destination = "mapd_db"'
37  )
38  if "file_json" in kwargs["destinations"]:
39  valid_destination_set = True
40  if kwargs["output_file_json"] is None:
41  # If output_file_json is not set for file_json, then exit
42  logging.error(
43  '"output_file_json" is required when destination = "file_json"'
44  )
45  if "output" in kwargs["destinations"]:
46  valid_destination_set = True
47  if "jenkins_bench" in kwargs["destinations"]:
48  valid_destination_set = True
49  if kwargs["output_file_jenkins"] is None:
50  # If output_file_jenkins is not set for jenkins_bench, then exit
51  logging.error(
52  '"output_file_jenkins" is required '
53  + 'when destination = "jenkins_bench"'
54  )
55  if not valid_destination_set:
56  return False
57  else:
58  return True
59 
60 
61 def get_connection(**kwargs):
62  """
63  Connects to the db using pymapd
64  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
65 
66  Kwargs:
67  db_user(str): DB username
68  db_passwd(str): DB password
69  db_server(str): DB host
70  db_port(int): DB port
71  db_name(str): DB name
72 
73  Returns:
74  con(class): Connection class
75  False(bool): The connection failed. Exception should be logged.
76  """
77  try:
78  logging.debug("Connecting to mapd db...")
79  con = pymapd.connect(
80  user=kwargs["db_user"],
81  password=kwargs["db_passwd"],
82  host=kwargs["db_server"],
83  port=kwargs["db_port"],
84  dbname=kwargs["db_name"],
85  )
86  logging.info("Succesfully connected to mapd db")
87  return con
88  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
89  logging.exception("Error connecting to database.")
90  return False
91 
92 
93 def get_run_vars(**kwargs):
94  """
95  Gets/sets run-specific vars such as time, uid, etc.
96 
97  Kwargs:
98  con(class 'pymapd.connection.Connection'): Mapd connection
99 
100  Returns:
101  run_vars(dict):::
102  run_guid(str): Run GUID
103  run_timestamp(datetime): Run timestamp
104  run_connection(str): Connection string
105  run_driver(str): Run driver
106  run_version(str): Version of DB
107  run_version_short(str): Shortened version of DB
108  conn_machine_name(str): Name of run machine
109  """
110  run_guid = str(uuid.uuid4())
111  logging.debug("Run guid: " + run_guid)
112  run_timestamp = datetime.datetime.now()
113  run_connection = str(kwargs["con"])
114  logging.debug("Connection string: " + run_connection)
115  run_driver = "" # TODO
116  run_version = kwargs["con"]._client.get_version()
117  if "-" in run_version:
118  run_version_short = run_version.split("-")[0]
119  else:
120  run_version_short = run_version
121  conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
122  run_vars = {
123  "run_guid": run_guid,
124  "run_timestamp": run_timestamp,
125  "run_connection": run_connection,
126  "run_driver": run_driver,
127  "run_version": run_version,
128  "run_version_short": run_version_short,
129  "conn_machine_name": conn_machine_name,
130  }
131  return run_vars
132 
133 
134 def get_gpu_info(**kwargs):
135  """
136  Gets run machine GPU info
137 
138  Kwargs:
139  gpu_name(str): GPU name from input param
140  no_gather_conn_gpu_info(bool): Gather GPU info fields
141  con(class 'pymapd.connection.Connection'): Mapd connection
142  conn_machine_name(str): Name of run machine
143  no_gather_nvml_gpu_info(bool): Do not gather GPU info using nvml
144  gather_nvml_gpu_info(bool): Gather GPU info using nvml
145  gpu_count(int): Number of GPUs on run machine
146 
147  Returns:
148  gpu_info(dict):::
149  conn_gpu_count(int): Number of GPUs gathered from pymapd con
150  source_db_gpu_count(int): Number of GPUs on run machine
151  source_db_gpu_mem(str): Amount of GPU mem on run machine
152  source_db_gpu_driver_ver(str): GPU driver version
153  source_db_gpu_name(str): GPU name
154  """
155  # Set GPU info fields
156  conn_gpu_count = None
157  source_db_gpu_count = None
158  source_db_gpu_mem = None
159  source_db_gpu_driver_ver = ""
160  source_db_gpu_name = ""
161  if kwargs["no_gather_conn_gpu_info"]:
162  logging.debug(
163  "--no-gather-conn-gpu-info passed, "
164  + "using blank values for source database GPU info fields "
165  + "[run_gpu_count, run_gpu_mem_mb] "
166  )
167  else:
168  logging.debug(
169  "Gathering source database GPU info fields "
170  + "[run_gpu_count, run_gpu_mem_mb] "
171  + "using pymapd connection info. "
172  )
173  conn_hardware_info = kwargs["con"]._client.get_hardware_info(
174  kwargs["con"]._session
175  )
176  conn_gpu_count = conn_hardware_info.hardware_info[0].num_gpu_allocated
177  if conn_gpu_count == 0 or conn_gpu_count is None:
178  no_gather_nvml_gpu_info = True
179  if conn_gpu_count == 0:
180  logging.warning(
181  "0 GPUs detected from connection info, "
182  + "using blank values for source database GPU info fields "
183  + "If running against cpu-only server, make sure to set "
184  + "--no-gather-nvml-gpu-info and --no-gather-conn-gpu-info."
185  )
186  else:
187  no_gather_nvml_gpu_info = kwargs["no_gather_nvml_gpu_info"]
188  source_db_gpu_count = conn_gpu_count
189  try:
190  source_db_gpu_mem = int(
191  conn_hardware_info.hardware_info[0].gpu_info[0].memory
192  / 1000000
193  )
194  except IndexError:
195  logging.error("GPU memory info not available from connection.")
196  if no_gather_nvml_gpu_info:
197  logging.debug(
198  "--no-gather-nvml-gpu-info passed, "
199  + "using blank values for source database GPU info fields "
200  + "[gpu_driver_ver, run_gpu_name] "
201  )
202  elif (
203  kwargs["conn_machine_name"] == "localhost"
204  or kwargs["gather_nvml_gpu_info"]
205  ):
206  logging.debug(
207  "Gathering source database GPU info fields "
208  + "[gpu_driver_ver, run_gpu_name] "
209  + "from local GPU using pynvml. "
210  )
211  import pynvml
212 
213  pynvml.nvmlInit()
214  source_db_gpu_driver_ver = pynvml.nvmlSystemGetDriverVersion().decode()
215  for i in range(source_db_gpu_count):
216  handle = pynvml.nvmlDeviceGetHandleByIndex(i)
217  # Assume all cards are the same, overwrite name value
218  source_db_gpu_name = pynvml.nvmlDeviceGetName(handle).decode()
219  pynvml.nvmlShutdown()
220  # If gpu_count argument passed in, override gathered value
221  if kwargs["gpu_count"]:
222  source_db_gpu_count = kwargs["gpu_count"]
223  if kwargs["gpu_name"]:
224  source_db_gpu_name = kwargs["gpu_name"]
225  gpu_info = {
226  "conn_gpu_count": conn_gpu_count,
227  "source_db_gpu_count": source_db_gpu_count,
228  "source_db_gpu_mem": source_db_gpu_mem,
229  "source_db_gpu_driver_ver": source_db_gpu_driver_ver,
230  "source_db_gpu_name": source_db_gpu_name,
231  }
232  return gpu_info
233 
234 
235 def get_machine_info(**kwargs):
236  """
237  Gets run machine GPU info
238 
239  Kwargs:
240  conn_machine_name(str): Name of machine from pymapd con
241  machine_name(str): Name of machine if passed in
242  machine_uname(str): Uname of machine if passed in
243 
244  Returns:
245  machine_info(dict):::
246  run_machine_name(str): Run machine name
247  run_machine_uname(str): Run machine uname
248  """
249  # Set machine names, using local info if connected to localhost
250  if kwargs["conn_machine_name"] == "localhost":
251  local_uname = os.uname()
252  # If --machine-name passed in, override pymapd con value
253  if kwargs["machine_name"]:
254  run_machine_name = kwargs["machine_name"]
255  else:
256  if kwargs["conn_machine_name"] == "localhost":
257  run_machine_name = local_uname.nodename.split(".")[0]
258  else:
259  run_machine_name = kwargs["conn_machine_name"]
260  # If --machine-uname passed in, override pymapd con value
261  if kwargs["machine_uname"]:
262  run_machine_uname = kwargs["machine_uname"]
263  else:
264  if kwargs["conn_machine_name"] == "localhost":
265  run_machine_uname = " ".join(local_uname)
266  else:
267  run_machine_uname = ""
268  machine_info = {
269  "run_machine_name": run_machine_name,
270  "run_machine_uname": run_machine_uname,
271  }
272  return machine_info
273 
274 
275 def read_query_files(**kwargs):
276  """
277  Gets run machine GPU info
278 
279  Kwargs:
280  queries_dir(str): Directory with query files
281  source_table(str): Table to run query against
282 
283  Returns:
284  query_list(dict):::
285  query_group(str): Query group, usually matches table name
286  queries(list)
287  query(dict):::
288  name(str): Name of query
289  mapdql(str): Query syntax to run
290  False(bool): Unable to find queries dir
291  """
292  # Read query files contents and write to query_list
293  query_list = {"query_group": "", "queries": []}
294  query_group = kwargs["queries_dir"].split("/")[-1]
295  query_list.update(query_group=query_group)
296  logging.debug("Queries dir: " + kwargs["queries_dir"])
297  try:
298  for query_filename in sorted(os.listdir(kwargs["queries_dir"])):
299  logging.debug("Validating query filename: " + query_filename)
300  if validate_query_file(query_filename=query_filename):
301  with open(
302  kwargs["queries_dir"] + "/" + query_filename, "r"
303  ) as query_filepath:
304  logging.debug(
305  "Reading query with filename: " + query_filename
306  )
307  query_mapdql = query_filepath.read().replace("\n", " ")
308  query_mapdql = query_mapdql.replace(
309  "##TAB##", kwargs["source_table"]
310  )
311  query_list["queries"].append(
312  {"name": query_filename, "mapdql": query_mapdql}
313  )
314  logging.info("Read all query files")
315  return query_list
316  except FileNotFoundError:
317  logging.exception("Could not find queries directory.")
318  return False
319 
320 
322  """
323  Get queries to run for setup and teardown from directory
324 
325  Kwargs:
326  queries_dir(str): Directory with query files
327  source_table(str): Table to run query against
328  foreign_table_filename(str): File to create foreign table from
329 
330  Returns:
331  setup_queries(query_list): List of setup queries
332  teardown_queries(query_list): List of teardown queries
333  False(bool): Unable to find queries dir
334 
335  query_list is described by:
336  query_list(dict):::
337  query_group(str): Query group, usually matches table name
338  queries(list)
339  query(dict):::
340  name(str): Name of query
341  mapdql(str): Query syntax to run
342  """
343  setup_teardown_queries_dir = kwargs['queries_dir']
344  source_table = kwargs['source_table']
345  # Read setup/tear-down queries if they exist
346  setup_teardown_query_list = None
347  if setup_teardown_queries_dir is not None:
348  setup_teardown_query_list = read_query_files(
349  queries_dir=setup_teardown_queries_dir,
350  source_table=source_table
351  )
352  if kwargs["foreign_table_filename"] is not None:
353  for query in setup_teardown_query_list['queries']:
354  query['mapdql'] = query['mapdql'].replace(
355  "##FILE##", kwargs["foreign_table_filename"])
356  # Filter setup queries
357  setup_query_list = None
358  if setup_teardown_query_list is not None:
359  setup_query_list = filter(
361  query_filename=x['name'], check_which='setup', quiet=True),
362  setup_teardown_query_list['queries'])
363  setup_query_list = list(setup_query_list)
364  # Filter teardown queries
365  teardown_query_list = None
366  if setup_teardown_query_list is not None:
367  teardown_query_list = filter(
369  query_filename=x['name'], check_which='teardown', quiet=True),
370  setup_teardown_query_list['queries'])
371  teardown_query_list = list(teardown_query_list)
372  return setup_query_list, teardown_query_list
373 
374 
376  """
377  Validates query file. Currently only checks the query file name, and
378  checks for setup or teardown in basename
379 
380  Kwargs:
381  query_filename(str): Name of query file
382  check_which(bool): either 'setup' or 'teardown', decide which to
383  check
384  quiet(bool): optional, if True, no warning is logged
385 
386  Returns:
387  True(bool): Query succesfully validated
388  False(bool): Query failed validation
389  """
390  qfilename = kwargs["query_filename"]
391  basename = os.path.basename(qfilename)
392  check_str = False
393  if kwargs["check_which"] == 'setup':
394  check_str = basename.lower().find('setup') > -1
395  elif kwargs["check_which"] == 'teardown':
396  check_str = basename.lower().find('teardown') > -1
397  else:
398  raise TypeError('Unsupported `check_which` parameter.')
399  return_val = True
400  if not qfilename.endswith(".sql"):
401  logging.warning(
402  "Query filename "
403  + qfilename
404  + ' is invalid - does not end in ".sql". Skipping'
405  )
406  return_val = False
407  elif not check_str:
408  quiet = True if 'quiet' in kwargs and kwargs['quiet'] else False
409  if not quiet:
410  logging.warning(
411  "Query filename "
412  + qfilename
413  + ' does not match "setup" or "teardown". Skipping'
414  )
415  return_val = False
416  return return_val
417 
418 
419 def validate_query_file(**kwargs):
420  """
421  Validates query file. Currently only checks the query file name
422 
423  Kwargs:
424  query_filename(str): Name of query file
425 
426  Returns:
427  True(bool): Query succesfully validated
428  False(bool): Query failed validation
429  """
430  if not kwargs["query_filename"].endswith(".sql"):
431  logging.warning(
432  "Query filename "
433  + kwargs["query_filename"]
434  + ' is invalid - does not end in ".sql". Skipping'
435  )
436  return False
437  else:
438  return True
439 
440 
441 def execute_query(**kwargs):
442  """
443  Executes a query against the connected db using pymapd
444  https://pymapd.readthedocs.io/en/latest/usage.html#querying
445 
446  Kwargs:
447  query_name(str): Name of query
448  query_mapdql(str): Query to run
449  iteration(int): Iteration number
450  con(class): Connection class
451 
452  Returns:
453  query_execution(dict):::
454  result_count(int): Number of results returned
455  execution_time(float): Time (in ms) that pymapd reports
456  backend spent on query.
457  connect_time(float): Time (in ms) for overhead of query, calculated
458  by subtracting backend execution time
459  from time spent on the execution function.
460  results_iter_time(float): Time (in ms) it took to for
461  pymapd.fetchone() to iterate through all
462  of the results.
463  total_time(float): Time (in ms) from adding all above times.
464  False(bool): The query failed. Exception should be logged.
465  """
466  start_time = timeit.default_timer()
467  try:
468  # Run the query
469  query_result = kwargs["con"].execute(kwargs["query_mapdql"])
470  logging.debug(
471  "Completed iteration "
472  + str(kwargs["iteration"])
473  + " of query "
474  + kwargs["query_name"]
475  )
476  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
477  logging.exception(
478  "Error running query "
479  + kwargs["query_name"]
480  + " during iteration "
481  + str(kwargs["iteration"])
482  )
483  return False
484 
485  # Calculate times
486  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
487  execution_time = query_result._result.execution_time_ms
488  debug_info = query_result._result.debug
489  connect_time = round((query_elapsed_time - execution_time), 1)
490  # Iterate through each result from the query
491  logging.debug(
492  "Counting results from query"
493  + kwargs["query_name"]
494  + " iteration "
495  + str(kwargs["iteration"])
496  )
497  result_count = 0
498  start_time = timeit.default_timer()
499  while query_result.fetchone():
500  result_count += 1
501  results_iter_time = round(
502  ((timeit.default_timer() - start_time) * 1000), 1
503  )
504  query_execution = {
505  "result_count": result_count,
506  "execution_time": execution_time,
507  "connect_time": connect_time,
508  "results_iter_time": results_iter_time,
509  "total_time": execution_time + connect_time + results_iter_time,
510  "debug_info": debug_info,
511  }
512  logging.debug(
513  "Execution results for query"
514  + kwargs["query_name"]
515  + " iteration "
516  + str(kwargs["iteration"])
517  + ": "
518  + str(query_execution)
519  )
520  return query_execution
521 
522 
523 def calculate_query_times(**kwargs):
524  """
525  Calculates aggregate query times from all iteration times
526 
527  Kwargs:
528  total_times(list): List of total time calculations
529  execution_times(list): List of execution_time calculations
530  results_iter_times(list): List of results_iter_time calculations
531  connect_times(list): List of connect_time calculations
532  trim(float): Amount to trim from iterations set to gather trimmed
533  values. Enter as deciman corresponding to percent to
534  trim - ex: 0.15 to trim 15%.
535 
536  Returns:
537  query_execution(dict): Query times
538  False(bool): The query failed. Exception should be logged.
539  """
540  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
541  return {
542  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
543  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
544  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
545  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
546  "total_time_trimmed_avg": round(
547  numpy.mean(
548  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
549  ),
550  1,
551  )
552  if trim_size
553  else round(numpy.mean(kwargs["total_times"]), 1),
554  "total_times": kwargs["total_times"],
555  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
556  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
557  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
558  "execution_time_85": round(
559  numpy.percentile(kwargs["execution_times"], 85), 1
560  ),
561  "execution_time_25": round(
562  numpy.percentile(kwargs["execution_times"], 25), 1
563  ),
564  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
565  "execution_time_trimmed_avg": round(
566  numpy.mean(
567  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
568  )
569  )
570  if trim_size > 0
571  else round(numpy.mean(kwargs["execution_times"]), 1),
572  "execution_time_trimmed_max": round(
573  numpy.max(
574  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
575  )
576  )
577  if trim_size > 0
578  else round(numpy.max(kwargs["execution_times"]), 1),
579  "execution_times": kwargs["execution_times"],
580  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
581  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
582  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
583  "connect_time_85": round(
584  numpy.percentile(kwargs["connect_times"], 85), 1
585  ),
586  "results_iter_time_avg": round(
587  numpy.mean(kwargs["results_iter_times"]), 1
588  ),
589  "results_iter_time_min": round(
590  numpy.min(kwargs["results_iter_times"]), 1
591  ),
592  "results_iter_time_max": round(
593  numpy.max(kwargs["results_iter_times"]), 1
594  ),
595  "results_iter_time_85": round(
596  numpy.percentile(kwargs["results_iter_times"], 85), 1
597  ),
598  }
599 
600 
601 def get_mem_usage(**kwargs):
602  """
603  Calculates memory statistics from mapd_server _client.get_memory call
604 
605  Kwargs:
606  con(class 'pymapd.connection.Connection'): Mapd connection
607  mem_type(str): [gpu, cpu] Type of memory to gather metrics for
608 
609  Returns:
610  ramusage(dict):::
611  usedram(float): Amount of memory (in MB) used
612  freeram(float): Amount of memory (in MB) free
613  totalallocated(float): Total amount of memory (in MB) allocated
614  errormessage(str): Error if returned by get_memory call
615  rawdata(list): Raw data returned from get_memory call
616  """
617  try:
618  con_mem_data_list = kwargs["con"]._client.get_memory(
619  session=kwargs["con"]._session, memory_level=kwargs["mem_type"]
620  )
621  usedram = 0
622  freeram = 0
623  for con_mem_data in con_mem_data_list:
624  page_size = con_mem_data.page_size
625  node_memory_data_list = con_mem_data.node_memory_data
626  for node_memory_data in node_memory_data_list:
627  ram = node_memory_data.num_pages * page_size
628  is_free = node_memory_data.is_free
629  if is_free:
630  freeram += ram
631  else:
632  usedram += ram
633  totalallocated = usedram + freeram
634  if totalallocated > 0:
635  totalallocated = round(totalallocated / 1024 / 1024, 1)
636  usedram = round(usedram / 1024 / 1024, 1)
637  freeram = round(freeram / 1024 / 1024, 1)
638  ramusage = {}
639  ramusage["usedram"] = usedram
640  ramusage["freeram"] = freeram
641  ramusage["totalallocated"] = totalallocated
642  ramusage["errormessage"] = ""
643  except Exception as e:
644  errormessage = "Get memory failed with error: " + str(e)
645  logging.error(errormessage)
646  ramusage["errormessage"] = errormessage
647  return ramusage
648 
649 
650 def run_query(**kwargs):
651  """
652  Takes query name, syntax, and iteration count and calls the
653  execute_query function for each iteration. Reports total, iteration,
654  and exec timings, memory usage, and failure status.
655 
656  Kwargs:
657  query(dict):::
658  name(str): Name of query
659  mapdql(str): Query syntax to run
660  iterations(int): Number of iterations of each query to run
661  trim(float): Trim decimal to remove from top and bottom of results
662  con(class 'pymapd.connection.Connection'): Mapd connection
663 
664  Returns:
665  query_results(dict):::
666  query_name(str): Name of query
667  query_mapdql(str): Query to run
668  query_id(str): Query ID
669  query_succeeded(bool): Query succeeded
670  query_error_info(str): Query error info
671  result_count(int): Number of results returned
672  initial_iteration_results(dict):::
673  first_execution_time(float): Execution time for first query
674  iteration
675  first_connect_time(float): Connect time for first query
676  iteration
677  first_results_iter_time(float): Results iteration time for
678  first query iteration
679  first_total_time(float): Total time for first iteration
680  first_cpu_mem_usage(float): CPU memory usage for first query
681  iteration
682  first_gpu_mem_usage(float): GPU memory usage for first query
683  iteration
684  noninitial_iteration_results(list):::
685  execution_time(float): Time (in ms) that pymapd reports
686  backend spent on query.
687  connect_time(float): Time (in ms) for overhead of query,
688  calculated by subtracting backend execution time from
689  time spent on the execution function.
690  results_iter_time(float): Time (in ms) it took to for
691  pymapd.fetchone() to iterate through all of the results.
692  total_time(float): Time (in ms) from adding all above times.
693  query_total_elapsed_time(int): Total elapsed time for query
694  False(bool): The query failed. Exception should be logged.
695  """
696  logging.info(
697  "Running query: "
698  + kwargs["query"]["name"]
699  + " iterations: "
700  + str(kwargs["iterations"])
701  )
702  query_id = kwargs["query"]["name"].rsplit(".")[
703  0
704  ] # Query ID = filename without extention
705  query_results = {
706  "query_name": kwargs["query"]["name"],
707  "query_mapdql": kwargs["query"]["mapdql"],
708  "query_id": query_id,
709  "query_succeeded": True,
710  "query_error_info": "",
711  "initial_iteration_results": {},
712  "noninitial_iteration_results": [],
713  "query_total_elapsed_time": 0,
714  }
715  query_total_start_time = timeit.default_timer()
716  # Run iterations of query
717  for iteration in range(kwargs["iterations"]):
718  # Gather memory before running query iteration
719  logging.debug("Getting pre-query memory usage on CPU")
720  pre_query_cpu_mem_usage = get_mem_usage(
721  con=kwargs["con"], mem_type="cpu"
722  )
723  logging.debug("Getting pre-query memory usage on GPU")
724  pre_query_gpu_mem_usage = get_mem_usage(
725  con=kwargs["con"], mem_type="gpu"
726  )
727  # Run query iteration
728  logging.debug(
729  "Running iteration "
730  + str(iteration)
731  + " of query "
732  + kwargs["query"]["name"]
733  )
734  query_result = execute_query(
735  query_name=kwargs["query"]["name"],
736  query_mapdql=kwargs["query"]["mapdql"],
737  iteration=iteration,
738  con=kwargs["con"],
739  )
740  # Gather memory after running query iteration
741  logging.debug("Getting post-query memory usage on CPU")
742  post_query_cpu_mem_usage = get_mem_usage(
743  con=kwargs["con"], mem_type="cpu"
744  )
745  logging.debug("Getting post-query memory usage on GPU")
746  post_query_gpu_mem_usage = get_mem_usage(
747  con=kwargs["con"], mem_type="gpu"
748  )
749  # Calculate total (post minus pre) memory usage after query iteration
750  query_cpu_mem_usage = round(
751  post_query_cpu_mem_usage["usedram"]
752  - pre_query_cpu_mem_usage["usedram"],
753  1,
754  )
755  query_gpu_mem_usage = round(
756  post_query_gpu_mem_usage["usedram"]
757  - pre_query_gpu_mem_usage["usedram"],
758  1,
759  )
760  if query_result:
761  query_results.update(
762  query_error_info="" # TODO - interpret query error info
763  )
764  # Assign first query iteration times
765  if iteration == 0:
766  first_execution_time = round(query_result["execution_time"], 1)
767  first_connect_time = round(query_result["connect_time"], 1)
768  first_results_iter_time = round(
769  query_result["results_iter_time"], 1
770  )
771  first_total_time = (
772  first_execution_time
773  + first_connect_time
774  + first_results_iter_time
775  )
776  query_results.update(
777  initial_iteration_results={
778  "first_execution_time": first_execution_time,
779  "first_connect_time": first_connect_time,
780  "first_results_iter_time": first_results_iter_time,
781  "first_total_time": first_total_time,
782  "first_cpu_mem_usage": query_cpu_mem_usage,
783  "first_gpu_mem_usage": query_gpu_mem_usage,
784  }
785  )
786  else:
787  # Put noninitial iterations into query_result list
788  query_results["noninitial_iteration_results"].append(
789  query_result
790  )
791  # Verify no change in memory for noninitial iterations
792  if query_cpu_mem_usage != 0.0:
793  logging.error(
794  (
795  "Noninitial iteration ({0}) of query ({1}) "
796  + "shows non-zero CPU memory usage: {2}"
797  ).format(
798  iteration,
799  kwargs["query"]["name"],
800  query_cpu_mem_usage,
801  )
802  )
803  if query_gpu_mem_usage != 0.0:
804  logging.error(
805  (
806  "Noninitial iteration ({0}) of query ({1}) "
807  + "shows non-zero GPU memory usage: {2}"
808  ).format(
809  iteration,
810  kwargs["query"]["name"],
811  query_gpu_mem_usage,
812  )
813  )
814  else:
815  logging.warning(
816  "Error detected during execution of query: "
817  + kwargs["query"]["name"]
818  + ". This query will be skipped and "
819  + "times will not reported"
820  )
821  query_results.update(query_succeeded=False)
822  break
823  # Calculate time for all iterations to run
824  query_total_elapsed_time = round(
825  ((timeit.default_timer() - query_total_start_time) * 1000), 1
826  )
827  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
828  logging.info(
829  "Completed all iterations of query " + kwargs["query"]["name"]
830  )
831  return query_results
832 
833 
835  """
836  Convenience wrapper around `run_query` to run a setup or
837  teardown query
838 
839  Kwargs:
840  queries(query_list): List of queries to run
841  do_run(bool): If true will run query, otherwise do nothing
842  trim(float): Trim decimal to remove from top and bottom of results
843  con(class 'pymapd.connection.Connection'): Mapd connection
844 
845  Returns:
846  See return value for `run_query`
847 
848  query_list is described by:
849  queries(list)
850  query(dict):::
851  name(str): Name of query
852  mapdql(str): Query syntax to run
853  [setup : queries(list)]
854  [teardown : queries(list)]
855  """
856  query_results = list()
857  if kwargs['do_run']:
858  for query in kwargs['queries']:
859  result = run_query(
860  query=query, iterations=1,
861  trim=kwargs['trim'],
862  con=kwargs['con']
863  )
864  if not result['query_succeeded']:
865  logging.warning(
866  "Error setup or teardown query: "
867  + query["name"]
868  + ". did not complete."
869  )
870  else:
871  query_results.append(result)
872  return query_results
873 
874 
876  # Function to allow json to deal with datetime and numpy int
877  if isinstance(x, datetime.datetime):
878  return x.isoformat()
879  if isinstance(x, numpy.int64):
880  return int(x)
881  raise TypeError("Unknown type")
882 
883 
885  """
886  Create results dataset
887 
888  Kwargs:
889  run_guid(str): Run GUID
890  run_timestamp(datetime): Run timestamp
891  run_connection(str): Connection string
892  run_machine_name(str): Run machine name
893  run_machine_uname(str): Run machine uname
894  run_driver(str): Run driver
895  run_version(str): Version of DB
896  run_version_short(str): Shortened version of DB
897  label(str): Run label
898  source_db_gpu_count(int): Number of GPUs on run machine
899  source_db_gpu_driver_ver(str): GPU driver version
900  source_db_gpu_name(str): GPU name
901  source_db_gpu_mem(str): Amount of GPU mem on run machine
902  source_table(str): Table to run query against
903  trim(float): Trim decimal to remove from top and bottom of results
904  iterations(int): Number of iterations of each query to run
905  query_group(str): Query group, usually matches table name
906  query_results(dict):::
907  query_name(str): Name of query
908  query_mapdql(str): Query to run
909  query_id(str): Query ID
910  query_succeeded(bool): Query succeeded
911  query_error_info(str): Query error info
912  result_count(int): Number of results returned
913  initial_iteration_results(dict):::
914  first_execution_time(float): Execution time for first query
915  iteration
916  first_connect_time(float): Connect time for first query
917  iteration
918  first_results_iter_time(float): Results iteration time for
919  first query iteration
920  first_total_time(float): Total time for first iteration
921  first_cpu_mem_usage(float): CPU memory usage for first query
922  iteration
923  first_gpu_mem_usage(float): GPU memory usage for first query
924  iteration
925  noninitial_iteration_results(list):::
926  execution_time(float): Time (in ms) that pymapd reports
927  backend spent on query.
928  connect_time(float): Time (in ms) for overhead of query,
929  calculated by subtracting backend execution time from
930  time spent on the execution function.
931  results_iter_time(float): Time (in ms) it took to for
932  pymapd.fetchone() to iterate through all of the results.
933  total_time(float): Time (in ms) from adding all above times.
934  query_total_elapsed_time(int): Total elapsed time for query
935 
936  Returns:
937  results_dataset(list):::
938  result_dataset(dict): Query results dataset
939  """
940  results_dataset = []
941  for query_results in kwargs["queries_results"]:
942  if query_results["query_succeeded"]:
943  # Aggregate iteration values
944  execution_times, connect_times, results_iter_times, total_times = (
945  [],
946  [],
947  [],
948  [],
949  )
950  detailed_timing_last_iteration = {}
951  if len(query_results["noninitial_iteration_results"]) == 0:
952  # A single query run (most likely a setup or teardown query)
953  initial_result = query_results["initial_iteration_results"]
954  execution_times.append(initial_result["first_execution_time"])
955  connect_times.append(initial_result["first_connect_time"])
956  results_iter_times.append(
957  initial_result["first_results_iter_time"]
958  )
959  total_times.append(initial_result["first_total_time"])
960  # Special case
961  result_count = 1
962  else:
963  # More than one query run
964  for noninitial_result in query_results[
965  "noninitial_iteration_results"
966  ]:
967  execution_times.append(noninitial_result["execution_time"])
968  connect_times.append(noninitial_result["connect_time"])
969  results_iter_times.append(
970  noninitial_result["results_iter_time"]
971  )
972  total_times.append(noninitial_result["total_time"])
973  # Overwrite result count, same for each iteration
974  result_count = noninitial_result["result_count"]
975 
976  # If available, getting the last iteration's component-wise timing information as a json structure
977  if (
978  query_results["noninitial_iteration_results"][-1]["debug_info"]
979  ):
980  detailed_timing_last_iteration = json.loads(
981  query_results["noninitial_iteration_results"][-1][
982  "debug_info"
983  ]
984  )["timer"]
985  # Calculate query times
986  logging.debug(
987  "Calculating times from query " + query_results["query_id"]
988  )
989  query_times = calculate_query_times(
990  total_times=total_times,
991  execution_times=execution_times,
992  connect_times=connect_times,
993  results_iter_times=results_iter_times,
994  trim=kwargs[
995  "trim"
996  ], # Trim top and bottom n% for trimmed calculations
997  )
998  result_dataset = {
999  "name": query_results["query_name"],
1000  "mapdql": query_results["query_mapdql"],
1001  "succeeded": True,
1002  "results": {
1003  "run_guid": kwargs["run_guid"],
1004  "run_timestamp": kwargs["run_timestamp"],
1005  "run_connection": kwargs["run_connection"],
1006  "run_machine_name": kwargs["run_machine_name"],
1007  "run_machine_uname": kwargs["run_machine_uname"],
1008  "run_driver": kwargs["run_driver"],
1009  "run_version": kwargs["run_version"],
1010  "run_version_short": kwargs["run_version_short"],
1011  "run_label": kwargs["label"],
1012  "run_gpu_count": kwargs["source_db_gpu_count"],
1013  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
1014  "run_gpu_name": kwargs["source_db_gpu_name"],
1015  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
1016  "run_table": kwargs["source_table"],
1017  "query_group": kwargs["query_group"],
1018  "query_id": query_results["query_id"],
1019  "query_result_set_count": result_count,
1020  "query_error_info": query_results["query_error_info"],
1021  "query_conn_first": query_results[
1022  "initial_iteration_results"
1023  ]["first_connect_time"],
1024  "query_conn_avg": query_times["connect_time_avg"],
1025  "query_conn_min": query_times["connect_time_min"],
1026  "query_conn_max": query_times["connect_time_max"],
1027  "query_conn_85": query_times["connect_time_85"],
1028  "query_exec_first": query_results[
1029  "initial_iteration_results"
1030  ]["first_execution_time"],
1031  "query_exec_avg": query_times["execution_time_avg"],
1032  "query_exec_min": query_times["execution_time_min"],
1033  "query_exec_max": query_times["execution_time_max"],
1034  "query_exec_85": query_times["execution_time_85"],
1035  "query_exec_25": query_times["execution_time_25"],
1036  "query_exec_stdd": query_times["execution_time_std"],
1037  "query_exec_trimmed_avg": query_times[
1038  "execution_time_trimmed_avg"
1039  ],
1040  "query_exec_trimmed_max": query_times[
1041  "execution_time_trimmed_max"
1042  ],
1043  # Render queries not supported yet
1044  "query_render_first": None,
1045  "query_render_avg": None,
1046  "query_render_min": None,
1047  "query_render_max": None,
1048  "query_render_85": None,
1049  "query_render_25": None,
1050  "query_render_stdd": None,
1051  "query_total_first": query_results[
1052  "initial_iteration_results"
1053  ]["first_total_time"],
1054  "query_total_avg": query_times["total_time_avg"],
1055  "query_total_min": query_times["total_time_min"],
1056  "query_total_max": query_times["total_time_max"],
1057  "query_total_85": query_times["total_time_85"],
1058  "query_total_all": query_results[
1059  "query_total_elapsed_time"
1060  ],
1061  "query_total_trimmed_avg": query_times[
1062  "total_time_trimmed_avg"
1063  ],
1064  "results_iter_count": kwargs["iterations"],
1065  "results_iter_first": query_results[
1066  "initial_iteration_results"
1067  ]["first_results_iter_time"],
1068  "results_iter_avg": query_times["results_iter_time_avg"],
1069  "results_iter_min": query_times["results_iter_time_min"],
1070  "results_iter_max": query_times["results_iter_time_max"],
1071  "results_iter_85": query_times["results_iter_time_85"],
1072  "cpu_mem_usage_mb": query_results[
1073  "initial_iteration_results"
1074  ]["first_cpu_mem_usage"],
1075  "gpu_mem_usage_mb": query_results[
1076  "initial_iteration_results"
1077  ]["first_gpu_mem_usage"],
1078  },
1079  "debug": {
1080  "query_exec_times": query_times["execution_times"],
1081  "query_total_times": query_times["total_times"],
1082  "detailed_timing_last_iteration": detailed_timing_last_iteration,
1083  },
1084  }
1085  elif not query_results["query_succeeded"]:
1086  result_dataset = {
1087  "name": query_results["query_name"],
1088  "mapdql": query_results["query_mapdql"],
1089  "succeeded": False,
1090  }
1091  results_dataset.append(result_dataset)
1092  logging.debug("All values set for query " + query_results["query_id"])
1093  return results_dataset
1094 
1095 
1096 def send_results_db(**kwargs):
1097  """
1098  Send results dataset to a database using pymapd
1099 
1100  Kwargs:
1101  results_dataset(list):::
1102  result_dataset(dict): Query results dataset
1103  table(str): Results destination table name
1104  db_user(str): Results destination user name
1105  db_passwd(str): Results destination password
1106  db_server(str): Results destination server address
1107  db_port(int): Results destination server port
1108  db_name(str): Results destination database name
1109  table_schema_file(str): Path to destination database schema file
1110 
1111  Returns:
1112  True(bool): Sending results to destination database succeeded
1113  False(bool): Sending results to destination database failed. Exception
1114  should be logged.
1115  """
1116  # Create dataframe from list of query results
1117  logging.debug("Converting results list to pandas dataframe")
1118  results_df = DataFrame(kwargs["results_dataset"])
1119  # Establish connection to destination db
1120  logging.debug("Connecting to destination db")
1121  dest_con = get_connection(
1122  db_user=kwargs["db_user"],
1123  db_passwd=kwargs["db_passwd"],
1124  db_server=kwargs["db_server"],
1125  db_port=kwargs["db_port"],
1126  db_name=kwargs["db_name"],
1127  )
1128  if not dest_con:
1129  logging.exception("Could not connect to destination db.")
1130  return False
1131  # Load results into db, creating table if it does not exist
1132  tables = dest_con.get_tables()
1133  if kwargs["table"] not in tables:
1134  logging.info("Destination table does not exist. Creating.")
1135  try:
1136  with open(kwargs["table_schema_file"], "r") as table_schema:
1137  logging.debug(
1138  "Reading table_schema_file: " + kwargs["table_schema_file"]
1139  )
1140  create_table_sql = table_schema.read().replace("\n", " ")
1141  create_table_sql = create_table_sql.replace(
1142  "##TAB##", kwargs["table"]
1143  )
1144  except FileNotFoundError:
1145  logging.exception("Could not find destination table_schema_file.")
1146  return False
1147  try:
1148  logging.debug("Executing create destination table query")
1149  dest_con.execute(create_table_sql)
1150  logging.debug("Destination table created.")
1151  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1152  logging.exception("Error running destination table creation")
1153  return False
1154  logging.info("Loading results into destination db")
1155  try:
1156  dest_con.load_table_columnar(
1157  kwargs["table"],
1158  results_df,
1159  preserve_index=False,
1160  chunk_size_bytes=0,
1161  col_names_from_schema=True,
1162  )
1163  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1164  logging.exception("Error loading results into destination db")
1165  dest_con.close()
1166  return False
1167  dest_con.close()
1168  return True
1169 
1170 
1172  """
1173  Send results dataset to a local json file
1174 
1175  Kwargs:
1176  results_dataset_json(str): Json-formatted query results dataset
1177  output_file_json (str): Location of .json file output
1178 
1179  Returns:
1180  True(bool): Sending results to json file succeeded
1181  False(bool): Sending results to json file failed. Exception
1182  should be logged.
1183  """
1184  try:
1185  logging.debug("Opening json output file for writing")
1186  with open(kwargs["output_file_json"], "w") as file_json_open:
1187  logging.info(
1188  "Writing to output json file: " + kwargs["output_file_json"]
1189  )
1190  file_json_open.write(kwargs["results_dataset_json"])
1191  return True
1192  except IOError:
1193  logging.exception("Error writing results to json output file")
1194  return False
1195 
1196 
1198  """
1199  Send results dataset to a local json file formatted for use with jenkins
1200  benchmark plugin: https://github.com/jenkinsci/benchmark-plugin
1201 
1202  Kwargs:
1203  results_dataset(list):::
1204  result_dataset(dict): Query results dataset
1205  thresholds_name(str): Name to use for Jenkins result field
1206  thresholds_field(str): Field to use for query threshold in jenkins
1207  output_tag_jenkins(str): Jenkins benchmark result tag, for different
1208  sets from same table
1209  output_file_jenkins (str): Location of .json jenkins file output
1210 
1211  Returns:
1212  True(bool): Sending results to json file succeeded
1213  False(bool): Sending results to json file failed. Exception
1214  should be logged.
1215  """
1216  jenkins_bench_results = []
1217  for result_dataset in kwargs["results_dataset"]:
1218  logging.debug("Constructing output for jenkins benchmark plugin")
1219  jenkins_bench_results.append(
1220  {
1221  "name": result_dataset["query_id"],
1222  "description": "",
1223  "parameters": [],
1224  "results": [
1225  {
1226  "name": result_dataset["query_id"]
1227  + "_"
1228  + kwargs["thresholds_name"],
1229  "description": "",
1230  "unit": "ms",
1231  "dblValue": result_dataset[kwargs["thresholds_field"]],
1232  }
1233  ],
1234  }
1235  )
1236  jenkins_bench_json = json.dumps(
1237  {
1238  "groups": [
1239  {
1240  "name": result_dataset["run_table"]
1241  + kwargs["output_tag_jenkins"],
1242  "description": "Source table: "
1243  + result_dataset["run_table"],
1244  "tests": jenkins_bench_results,
1245  }
1246  ]
1247  }
1248  )
1249  try:
1250  logging.debug("Opening jenkins_bench json output file for writing")
1251  with open(kwargs["output_file_jenkins"], "w") as file_jenkins_open:
1252  logging.info(
1253  "Writing to jenkins_bench json file: "
1254  + kwargs["output_file_jenkins"]
1255  )
1256  file_jenkins_open.write(jenkins_bench_json)
1257  return True
1258  except IOError:
1259  logging.exception("Error writing results to jenkins json output file")
1260  return False
1261 
1262 
1263 def send_results_output(**kwargs):
1264  """
1265  Send results dataset script output
1266 
1267  Kwargs:
1268  results_dataset_json(str): Json-formatted query results dataset
1269 
1270  Returns:
1271  True(bool): Sending results to output succeeded
1272  """
1273  logging.info("Printing query results to output")
1274  print(kwargs["results_dataset_json"])
1275  return True
1276 
1277 
1278 def process_arguments(input_arguments):
1279  # Parse input parameters
1280  parser = ArgumentParser()
1281  optional = parser._action_groups.pop()
1282  required = parser.add_argument_group("required arguments")
1283  parser._action_groups.append(optional)
1284  optional.add_argument(
1285  "-v", "--verbose", action="store_true", help="Turn on debug logging"
1286  )
1287  optional.add_argument(
1288  "-q",
1289  "--quiet",
1290  action="store_true",
1291  help="Suppress script outuput " + "(except warnings and errors)",
1292  )
1293  required.add_argument(
1294  "-u",
1295  "--user",
1296  dest="user",
1297  default="mapd",
1298  help="Source database user",
1299  )
1300  required.add_argument(
1301  "-p",
1302  "--passwd",
1303  dest="passwd",
1304  default="HyperInteractive",
1305  help="Source database password",
1306  )
1307  required.add_argument(
1308  "-s",
1309  "--server",
1310  dest="server",
1311  default="localhost",
1312  help="Source database server hostname",
1313  )
1314  optional.add_argument(
1315  "-o",
1316  "--port",
1317  dest="port",
1318  type=int,
1319  default=6274,
1320  help="Source database server port",
1321  )
1322  required.add_argument(
1323  "-n",
1324  "--name",
1325  dest="name",
1326  default="mapd",
1327  help="Source database name",
1328  )
1329  required.add_argument(
1330  "-t",
1331  "--table",
1332  dest="table",
1333  required=True,
1334  help="Source db table name",
1335  )
1336  required.add_argument(
1337  "-l",
1338  "--label",
1339  dest="label",
1340  required=True,
1341  help="Benchmark run label",
1342  )
1343  required.add_argument(
1344  "-d",
1345  "--queries-dir",
1346  dest="queries_dir",
1347  help='Absolute path to dir with query files. \
1348  [Default: "queries" dir in same location as script]',
1349  )
1350  required.add_argument(
1351  "-i",
1352  "--iterations",
1353  dest="iterations",
1354  type=int,
1355  required=True,
1356  help="Number of iterations per query. Must be > 1",
1357  )
1358  optional.add_argument(
1359  "-g",
1360  "--gpu-count",
1361  dest="gpu_count",
1362  type=int,
1363  default=None,
1364  help="Number of GPUs. Not required when gathering local gpu info",
1365  )
1366  optional.add_argument(
1367  "-G",
1368  "--gpu-name",
1369  dest="gpu_name",
1370  type=str,
1371  default="",
1372  help="Name of GPU(s). Not required when gathering local gpu info",
1373  )
1374  optional.add_argument(
1375  "--no-gather-conn-gpu-info",
1376  dest="no_gather_conn_gpu_info",
1377  action="store_true",
1378  help="Do not gather source database GPU info fields "
1379  + "[run_gpu_count, run_gpu_mem_mb] "
1380  + "using pymapd connection info. "
1381  + "Use when testing a CPU-only server.",
1382  )
1383  optional.add_argument(
1384  "--no-gather-nvml-gpu-info",
1385  dest="no_gather_nvml_gpu_info",
1386  action="store_true",
1387  help="Do not gather source database GPU info fields "
1388  + "[gpu_driver_ver, run_gpu_name] "
1389  + "from local GPU using pynvml. "
1390  + 'Defaults to True when source server is not "localhost". '
1391  + "Use when testing a CPU-only server.",
1392  )
1393  optional.add_argument(
1394  "--gather-nvml-gpu-info",
1395  dest="gather_nvml_gpu_info",
1396  action="store_true",
1397  help="Gather source database GPU info fields "
1398  + "[gpu_driver_ver, run_gpu_name] "
1399  + "from local GPU using pynvml. "
1400  + 'Defaults to True when source server is "localhost". '
1401  + "Only use when benchmarking against same machine that this script "
1402  + "is run from.",
1403  )
1404  optional.add_argument(
1405  "-m",
1406  "--machine-name",
1407  dest="machine_name",
1408  help="Name of source machine",
1409  )
1410  optional.add_argument(
1411  "-a",
1412  "--machine-uname",
1413  dest="machine_uname",
1414  help="Uname info from " + "source machine",
1415  )
1416  optional.add_argument(
1417  "-e",
1418  "--destination",
1419  dest="destination",
1420  default="mapd_db",
1421  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
1422  + "Multiple values can be input seperated by commas, "
1423  + 'ex: "mapd_db,file_json"',
1424  )
1425  optional.add_argument(
1426  "-U",
1427  "--dest-user",
1428  dest="dest_user",
1429  default="mapd",
1430  help="Destination mapd_db database user",
1431  )
1432  optional.add_argument(
1433  "-P",
1434  "--dest-passwd",
1435  dest="dest_passwd",
1436  default="HyperInteractive",
1437  help="Destination mapd_db database password",
1438  )
1439  optional.add_argument(
1440  "-S",
1441  "--dest-server",
1442  dest="dest_server",
1443  help="Destination mapd_db database server hostname"
1444  + ' (required if destination = "mapd_db")',
1445  )
1446  optional.add_argument(
1447  "-O",
1448  "--dest-port",
1449  dest="dest_port",
1450  type=int,
1451  default=6274,
1452  help="Destination mapd_db database server port",
1453  )
1454  optional.add_argument(
1455  "-N",
1456  "--dest-name",
1457  dest="dest_name",
1458  default="mapd",
1459  help="Destination mapd_db database name",
1460  )
1461  optional.add_argument(
1462  "-T",
1463  "--dest-table",
1464  dest="dest_table",
1465  default="results",
1466  help="Destination mapd_db table name",
1467  )
1468  optional.add_argument(
1469  "-C",
1470  "--dest-table-schema-file",
1471  dest="dest_table_schema_file",
1472  default="results_table_schemas/query-results.sql",
1473  help="Destination table schema file. This must be an executable "
1474  + "CREATE TABLE statement that matches the output of this script. It "
1475  + "is required when creating the results table. Default location is "
1476  + 'in "./results_table_schemas/query-results.sql"',
1477  )
1478  optional.add_argument(
1479  "-j",
1480  "--output-file-json",
1481  dest="output_file_json",
1482  help="Absolute path of .json output file "
1483  + '(required if destination = "file_json")',
1484  )
1485  optional.add_argument(
1486  "-J",
1487  "--output-file-jenkins",
1488  dest="output_file_jenkins",
1489  help="Absolute path of jenkins benchmark .json output file "
1490  + '(required if destination = "jenkins_bench")',
1491  )
1492  optional.add_argument(
1493  "-E",
1494  "--output-tag-jenkins",
1495  dest="output_tag_jenkins",
1496  default="",
1497  help="Jenkins benchmark result tag. "
1498  + 'Optional, appended to table name in "group" field',
1499  )
1500  optional.add_argument(
1501  "--setup-teardown-queries-dir",
1502  dest="setup_teardown_queries_dir",
1503  type=str,
1504  default=None,
1505  help='Absolute path to dir with setup & teardown query files. '
1506  'Query files with "setup" in the filename will be executed in '
1507  'the setup stage, likewise query files with "teardown" in '
1508  'the filenname will be executed in the tear-down stage. Queries '
1509  'execute in lexical order. [Default: None, meaning this option is '
1510  'not used.]',
1511  )
1512  optional.add_argument(
1513  "--run-setup-teardown-per-query",
1514  dest="run_setup_teardown_per_query",
1515  action="store_true",
1516  help='Run setup & teardown steps per query. '
1517  'If set, setup-teardown-queries-dir must be specified. '
1518  'If not set, but setup-teardown-queries-dir is specified '
1519  'setup & tear-down queries will run globally, that is, '
1520  'once per script invocation.'
1521  ' [Default: False]'
1522  )
1523  optional.add_argument(
1524  "-F",
1525  "--foreign-table-filename",
1526  dest="foreign_table_filename",
1527  default=None,
1528  help="Path to file containing template for import query. "
1529  "Path must be relative to the FOREIGN SERVER. "
1530  "Occurances of \"##FILE##\" within setup/teardown queries will be"
1531  " replaced with this. "
1532  )
1533  optional.add_argument(
1534  "--jenkins-thresholds-name",
1535  dest="jenkins_thresholds_name",
1536  default="average",
1537  help="Name of Jenkins output field.",
1538  )
1539  optional.add_argument(
1540  "--jenkins-thresholds-field",
1541  dest="jenkins_thresholds_field",
1542  default="query_exec_trimmed_avg",
1543  help="Field to report as jenkins output value.",
1544  )
1545  args = parser.parse_args(args=input_arguments)
1546  return args
1547 
1548 
1549 def benchmark(input_arguments):
1550  # Set input args to vars
1551  args = process_arguments(input_arguments)
1552  verbose = args.verbose
1553  quiet = args.quiet
1554  source_db_user = args.user
1555  source_db_passwd = args.passwd
1556  source_db_server = args.server
1557  source_db_port = args.port
1558  source_db_name = args.name
1559  source_table = args.table
1560  label = args.label
1561  queries_dir = args.queries_dir
1562  iterations = args.iterations
1563  gpu_count = args.gpu_count
1564  gpu_name = args.gpu_name
1565  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
1566  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
1567  gather_nvml_gpu_info = args.gather_nvml_gpu_info
1568  machine_name = args.machine_name
1569  machine_uname = args.machine_uname
1570  destinations = args.destination
1571  dest_db_user = args.dest_user
1572  dest_db_passwd = args.dest_passwd
1573  dest_db_server = args.dest_server
1574  dest_db_port = args.dest_port
1575  dest_db_name = args.dest_name
1576  dest_table = args.dest_table
1577  dest_table_schema_file = args.dest_table_schema_file
1578  output_file_json = args.output_file_json
1579  output_file_jenkins = args.output_file_jenkins
1580  output_tag_jenkins = args.output_tag_jenkins
1581  setup_teardown_queries_dir = args.setup_teardown_queries_dir
1582  run_setup_teardown_per_query = args.run_setup_teardown_per_query
1583  foreign_table_filename = args.foreign_table_filename
1584  jenkins_thresholds_name = args.jenkins_thresholds_name
1585  jenkins_thresholds_field = args.jenkins_thresholds_field
1586 
1587  # Hard-coded vars
1588  trim = 0.15
1589 
1590  # Set logging output level
1591  if verbose:
1592  logging.basicConfig(level=logging.DEBUG)
1593  elif quiet:
1594  logging.basicConfig(level=logging.WARNING)
1595  else:
1596  logging.basicConfig(level=logging.INFO)
1597 
1598  # Input validation
1599  if (iterations > 1) is not True:
1600  # Need > 1 iteration as first iteration is dropped from calculations
1601  logging.error("Iterations must be greater than 1")
1602  exit(1)
1604  destinations=destinations,
1605  dest_db_server=dest_db_server,
1606  output_file_json=output_file_json,
1607  output_file_jenkins=output_file_jenkins,
1608  ):
1609  logging.debug("Destination(s) have been verified.")
1610  else:
1611  logging.error("No valid destination(s) have been set. Exiting.")
1612  exit(1)
1613 
1614  # Establish connection to mapd db
1615  con = get_connection(
1616  db_user=source_db_user,
1617  db_passwd=source_db_passwd,
1618  db_server=source_db_server,
1619  db_port=source_db_port,
1620  db_name=source_db_name,
1621  )
1622  if not con:
1623  exit(1) # Exit if cannot connect to db
1624  # Set run-specific variables (time, uid, etc.)
1625  run_vars = get_run_vars(con=con)
1626  # Set GPU info depending on availability
1627  gpu_info = get_gpu_info(
1628  gpu_name=gpu_name,
1629  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
1630  con=con,
1631  conn_machine_name=run_vars["conn_machine_name"],
1632  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
1633  gather_nvml_gpu_info=gather_nvml_gpu_info,
1634  gpu_count=gpu_count,
1635  )
1636  # Set run machine info
1637  machine_info = get_machine_info(
1638  conn_machine_name=run_vars["conn_machine_name"],
1639  machine_name=machine_name,
1640  machine_uname=machine_uname,
1641  )
1642  # Read queries from files, set to queries dir in PWD if not passed in
1643  if not queries_dir:
1644  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
1645  query_list = read_query_files(
1646  queries_dir=queries_dir, source_table=source_table
1647  )
1648  if not query_list:
1649  exit(1)
1650  # Read setup/teardown queries if they exist
1651  setup_query_list, teardown_query_list =\
1652  read_setup_teardown_query_files(queries_dir=setup_teardown_queries_dir,
1653  source_table=source_table,
1654  foreign_table_filename=foreign_table_filename)
1655  # Check at what granularity we want to run setup or teardown queries at
1656  run_global_setup_queries = setup_query_list is not None and not run_setup_teardown_per_query
1657  run_per_query_setup_queries = setup_query_list is not None and run_setup_teardown_per_query
1658  run_global_teardown_queries = teardown_query_list is not None and not run_setup_teardown_per_query
1659  run_per_query_teardown_queries = teardown_query_list is not None and run_setup_teardown_per_query
1660  # Run global setup queries if they exist
1661  queries_results = []
1662  st_qr = run_setup_teardown_query(queries=setup_query_list,
1663  do_run=run_global_setup_queries, trim=trim, con=con)
1664  queries_results.extend(st_qr)
1665  # Run queries
1666  for query in query_list["queries"]:
1667  # Run setup queries
1668  st_qr = run_setup_teardown_query(
1669  queries=setup_query_list, do_run=run_per_query_setup_queries, trim=trim, con=con)
1670  queries_results.extend(st_qr)
1671  # Run benchmark query
1672  query_result = run_query(
1673  query=query, iterations=iterations, trim=trim, con=con
1674  )
1675  queries_results.append(query_result)
1676  # Run tear-down queries
1677  st_qr = run_setup_teardown_query(
1678  queries=teardown_query_list, do_run=run_per_query_teardown_queries, trim=trim, con=con)
1679  queries_results.extend(st_qr)
1680  logging.info("Completed all queries.")
1681  # Run global tear-down queries if they exist
1682  st_qr = run_setup_teardown_query(queries=teardown_query_list,
1683  do_run=run_global_teardown_queries, trim=trim, con=con)
1684  queries_results.extend(st_qr)
1685  logging.debug("Closing source db connection.")
1686  con.close()
1687  # Generate results dataset
1688  results_dataset = create_results_dataset(
1689  run_guid=run_vars["run_guid"],
1690  run_timestamp=run_vars["run_timestamp"],
1691  run_connection=run_vars["run_connection"],
1692  run_machine_name=machine_info["run_machine_name"],
1693  run_machine_uname=machine_info["run_machine_uname"],
1694  run_driver=run_vars["run_driver"],
1695  run_version=run_vars["run_version"],
1696  run_version_short=run_vars["run_version_short"],
1697  label=label,
1698  source_db_gpu_count=gpu_info["source_db_gpu_count"],
1699  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
1700  source_db_gpu_name=gpu_info["source_db_gpu_name"],
1701  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
1702  source_table=source_table,
1703  trim=trim,
1704  iterations=iterations,
1705  query_group=query_list["query_group"],
1706  queries_results=queries_results,
1707  )
1708  results_dataset_json = json.dumps(
1709  results_dataset, default=json_format_handler, indent=2
1710  )
1711  successful_results_dataset = [
1712  x for x in results_dataset if x["succeeded"] is not False
1713  ]
1714  successful_results_dataset_results = []
1715  for results_dataset_entry in successful_results_dataset:
1716  successful_results_dataset_results.append(
1717  results_dataset_entry["results"]
1718  )
1719  # Send results to destination(s)
1720  sent_destination = True
1721  if "mapd_db" in destinations:
1722  if not send_results_db(
1723  results_dataset=successful_results_dataset_results,
1724  table=dest_table,
1725  db_user=dest_db_user,
1726  db_passwd=dest_db_passwd,
1727  db_server=dest_db_server,
1728  db_port=dest_db_port,
1729  db_name=dest_db_name,
1730  table_schema_file=dest_table_schema_file,
1731  ):
1732  sent_destination = False
1733  if "file_json" in destinations:
1734  if not send_results_file_json(
1735  results_dataset_json=results_dataset_json,
1736  output_file_json=output_file_json,
1737  ):
1738  sent_destination = False
1739  if "jenkins_bench" in destinations:
1741  results_dataset=successful_results_dataset_results,
1742  thresholds_name=jenkins_thresholds_name,
1743  thresholds_field=jenkins_thresholds_field,
1744  output_tag_jenkins=output_tag_jenkins,
1745  output_file_jenkins=output_file_jenkins,
1746  ):
1747  sent_destination = False
1748  if "output" in destinations:
1749  if not send_results_output(results_dataset_json=results_dataset_json):
1750  sent_destination = False
1751  if not sent_destination:
1752  logging.error("Sending results to one or more destinations failed")
1753  exit(1)
1754  else:
1755  logging.info(
1756  "Succesfully loaded query results info into destination(s)"
1757  )
1758 
1759 
1760 if __name__ == "__main__":
1761  benchmark(sys.argv[1:])
std::string join(T const &container, std::string const &delim)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
def validate_setup_teardown_query_file
def verify_destinations
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:135
def create_results_dataset
def read_setup_teardown_query_files
def send_results_file_json
def calculate_query_times
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:82
def run_setup_teardown_query
def send_results_jenkins_bench