OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
run_benchmark.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pymapd
9 import re
10 import sys
11 from pandas import DataFrame
12 from argparse import ArgumentParser
13 
14 # For usage info, run: `./<script_name>.py --help`
15 
16 
17 def verify_destinations(**kwargs):
18  """
19  Verify script output destination(s)
20 
21  Kwargs:
22  destinations (list): List of destinations
23  dest_db_server (str): DB output destination server
24  output_file_json (str): Location of .json file output
25  output_file_jenkins (str): Location of .json jenkins file output
26 
27  Returns:
28  True(bool): Destination(s) is/are valid
29  False(bool): Destination(s) is/are not valid
30  """
31  if "mapd_db" in kwargs["destinations"]:
32  valid_destination_set = True
33  if kwargs["dest_db_server"] is None:
34  # If dest_server is not set for mapd_db, then exit
35  logging.error(
36  '"dest_server" is required when destination = "mapd_db"'
37  )
38  if "file_json" in kwargs["destinations"]:
39  valid_destination_set = True
40  if kwargs["output_file_json"] is None:
41  # If output_file_json is not set for file_json, then exit
42  logging.error(
43  '"output_file_json" is required when destination = "file_json"'
44  )
45  if "output" in kwargs["destinations"]:
46  valid_destination_set = True
47  if "jenkins_bench" in kwargs["destinations"]:
48  valid_destination_set = True
49  if kwargs["output_file_jenkins"] is None:
50  # If output_file_jenkins is not set for jenkins_bench, then exit
51  logging.error(
52  '"output_file_jenkins" is required '
53  + 'when destination = "jenkins_bench"'
54  )
55  if not valid_destination_set:
56  return False
57  else:
58  return True
59 
60 
61 def get_connection(**kwargs):
62  """
63  Connects to the db using pymapd
64  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
65 
66  Kwargs:
67  db_user(str): DB username
68  db_passwd(str): DB password
69  db_server(str): DB host
70  db_port(int): DB port
71  db_name(str): DB name
72 
73  Returns:
74  con(class): Connection class
75  False(bool): The connection failed. Exception should be logged.
76  """
77  try:
78  logging.debug("Connecting to mapd db...")
79  con = pymapd.connect(
80  user=kwargs["db_user"],
81  password=kwargs["db_passwd"],
82  host=kwargs["db_server"],
83  port=kwargs["db_port"],
84  dbname=kwargs["db_name"],
85  )
86  logging.info("Succesfully connected to mapd db")
87  return con
88  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
89  logging.exception("Error connecting to database.")
90  return False
91 
92 
93 def get_run_vars(**kwargs):
94  """
95  Gets/sets run-specific vars such as time, uid, etc.
96 
97  Kwargs:
98  con(class 'pymapd.connection.Connection'): Mapd connection
99 
100  Returns:
101  run_vars(dict):::
102  run_guid(str): Run GUID
103  run_timestamp(datetime): Run timestamp
104  run_connection(str): Connection string
105  run_driver(str): Run driver
106  run_version(str): Version of DB
107  run_version_short(str): Shortened version of DB
108  conn_machine_name(str): Name of run machine
109  """
110  run_guid = str(uuid.uuid4())
111  logging.debug("Run guid: " + run_guid)
112  run_timestamp = datetime.datetime.now()
113  run_connection = str(kwargs["con"])
114  logging.debug("Connection string: " + run_connection)
115  run_driver = "" # TODO
116  run_version = kwargs["con"]._client.get_version()
117  if "-" in run_version:
118  run_version_short = run_version.split("-")[0]
119  else:
120  run_version_short = run_version
121  conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
122  run_vars = {
123  "run_guid": run_guid,
124  "run_timestamp": run_timestamp,
125  "run_connection": run_connection,
126  "run_driver": run_driver,
127  "run_version": run_version,
128  "run_version_short": run_version_short,
129  "conn_machine_name": conn_machine_name,
130  }
131  return run_vars
132 
133 
134 def get_gpu_info(**kwargs):
135  """
136  Gets run machine GPU info
137 
138  Kwargs:
139  gpu_name(str): GPU name from input param
140  no_gather_conn_gpu_info(bool): Gather GPU info fields
141  con(class 'pymapd.connection.Connection'): Mapd connection
142  conn_machine_name(str): Name of run machine
143  no_gather_nvml_gpu_info(bool): Do not gather GPU info using nvml
144  gather_nvml_gpu_info(bool): Gather GPU info using nvml
145  gpu_count(int): Number of GPUs on run machine
146 
147  Returns:
148  gpu_info(dict):::
149  conn_gpu_count(int): Number of GPUs gathered from pymapd con
150  source_db_gpu_count(int): Number of GPUs on run machine
151  source_db_gpu_mem(str): Amount of GPU mem on run machine
152  source_db_gpu_driver_ver(str): GPU driver version
153  source_db_gpu_name(str): GPU name
154  """
155  # Set GPU info fields
156  conn_gpu_count = None
157  source_db_gpu_count = None
158  source_db_gpu_mem = None
159  source_db_gpu_driver_ver = ""
160  source_db_gpu_name = ""
161  if kwargs["no_gather_conn_gpu_info"]:
162  logging.debug(
163  "--no-gather-conn-gpu-info passed, "
164  + "using blank values for source database GPU info fields "
165  + "[run_gpu_count, run_gpu_mem_mb] "
166  )
167  else:
168  logging.debug(
169  "Gathering source database GPU info fields "
170  + "[run_gpu_count, run_gpu_mem_mb] "
171  + "using pymapd connection info. "
172  )
173  conn_hardware_info = kwargs["con"]._client.get_hardware_info(
174  kwargs["con"]._session
175  )
176  conn_gpu_count = conn_hardware_info.hardware_info[0].num_gpu_allocated
177  if conn_gpu_count == 0 or conn_gpu_count is None:
178  no_gather_nvml_gpu_info = True
179  if conn_gpu_count == 0:
180  logging.warning(
181  "0 GPUs detected from connection info, "
182  + "using blank values for source database GPU info fields "
183  + "If running against cpu-only server, make sure to set "
184  + "--no-gather-nvml-gpu-info and --no-gather-conn-gpu-info."
185  )
186  else:
187  no_gather_nvml_gpu_info = kwargs["no_gather_nvml_gpu_info"]
188  source_db_gpu_count = conn_gpu_count
189  try:
190  source_db_gpu_mem = int(
191  conn_hardware_info.hardware_info[0].gpu_info[0].memory
192  / 1000000
193  )
194  except IndexError:
195  logging.error("GPU memory info not available from connection.")
196  if no_gather_nvml_gpu_info:
197  logging.debug(
198  "--no-gather-nvml-gpu-info passed, "
199  + "using blank values for source database GPU info fields "
200  + "[gpu_driver_ver, run_gpu_name] "
201  )
202  elif (
203  kwargs["conn_machine_name"] == "localhost"
204  or kwargs["gather_nvml_gpu_info"]
205  ):
206  logging.debug(
207  "Gathering source database GPU info fields "
208  + "[gpu_driver_ver, run_gpu_name] "
209  + "from local GPU using pynvml. "
210  )
211  import pynvml
212 
213  pynvml.nvmlInit()
214  source_db_gpu_driver_ver = pynvml.nvmlSystemGetDriverVersion().decode()
215  for i in range(source_db_gpu_count):
216  handle = pynvml.nvmlDeviceGetHandleByIndex(i)
217  # Assume all cards are the same, overwrite name value
218  source_db_gpu_name = pynvml.nvmlDeviceGetName(handle).decode()
219  pynvml.nvmlShutdown()
220  # If gpu_count argument passed in, override gathered value
221  if kwargs["gpu_count"]:
222  source_db_gpu_count = kwargs["gpu_count"]
223  if kwargs["gpu_name"]:
224  source_db_gpu_name = kwargs["gpu_name"]
225  gpu_info = {
226  "conn_gpu_count": conn_gpu_count,
227  "source_db_gpu_count": source_db_gpu_count,
228  "source_db_gpu_mem": source_db_gpu_mem,
229  "source_db_gpu_driver_ver": source_db_gpu_driver_ver,
230  "source_db_gpu_name": source_db_gpu_name,
231  }
232  return gpu_info
233 
234 
235 def get_machine_info(**kwargs):
236  """
237  Gets run machine GPU info
238 
239  Kwargs:
240  conn_machine_name(str): Name of machine from pymapd con
241  machine_name(str): Name of machine if passed in
242  machine_uname(str): Uname of machine if passed in
243 
244  Returns:
245  machine_info(dict):::
246  run_machine_name(str): Run machine name
247  run_machine_uname(str): Run machine uname
248  """
249  # Set machine names, using local info if connected to localhost
250  if kwargs["conn_machine_name"] == "localhost":
251  local_uname = os.uname()
252  # If --machine-name passed in, override pymapd con value
253  if kwargs["machine_name"]:
254  run_machine_name = kwargs["machine_name"]
255  else:
256  if kwargs["conn_machine_name"] == "localhost":
257  run_machine_name = local_uname.nodename.split(".")[0]
258  else:
259  run_machine_name = kwargs["conn_machine_name"]
260  # If --machine-uname passed in, override pymapd con value
261  if kwargs["machine_uname"]:
262  run_machine_uname = kwargs["machine_uname"]
263  else:
264  if kwargs["conn_machine_name"] == "localhost":
265  run_machine_uname = " ".join(local_uname)
266  else:
267  run_machine_uname = ""
268  machine_info = {
269  "run_machine_name": run_machine_name,
270  "run_machine_uname": run_machine_uname,
271  }
272  return machine_info
273 
274 
275 def read_query_files(**kwargs):
276  """
277  Gets run machine GPU info
278 
279  Kwargs:
280  queries_dir(str): Directory with query files
281  source_table(str): Table to run query against
282 
283  Returns:
284  query_list(dict):::
285  query_group(str): Query group, usually matches table name
286  queries(list)
287  query(dict):::
288  name(str): Name of query
289  mapdql(str): Query syntax to run
290  False(bool): Unable to find queries dir
291  """
292  # Read query files contents and write to query_list
293  query_list = {"query_group": "", "queries": []}
294  query_group = kwargs["queries_dir"].split("/")[-1]
295  query_list.update(query_group=query_group)
296  logging.debug("Queries dir: " + kwargs["queries_dir"])
297  try:
298  for query_filename in sorted(os.listdir(kwargs["queries_dir"])):
299  logging.debug("Validating query filename: " + query_filename)
300  if validate_query_file(query_filename=query_filename):
301  with open(
302  kwargs["queries_dir"] + "/" + query_filename, "r"
303  ) as query_filepath:
304  logging.debug(
305  "Reading query with filename: " + query_filename
306  )
307  query_mapdql = query_filepath.read().replace("\n", " ")
308  query_mapdql = query_mapdql.replace(
309  "##TAB##", kwargs["source_table"]
310  )
311  query_list["queries"].append(
312  {"name": query_filename, "mapdql": query_mapdql}
313  )
314  logging.info("Read all query files")
315  return query_list
316  except FileNotFoundError:
317  logging.exception("Could not find queries directory.")
318  return False
319 
320 
322  """
323  Get queries to run for setup and teardown from directory
324 
325  Kwargs:
326  queries_dir(str): Directory with query files
327  source_table(str): Table to run query against
328  foreign_table_filename(str): File to create foreign table from
329 
330  Returns:
331  setup_queries(query_list): List of setup queries
332  teardown_queries(query_list): List of teardown queries
333  False(bool): Unable to find queries dir
334 
335  query_list is described by:
336  query_list(dict):::
337  query_group(str): Query group, usually matches table name
338  queries(list)
339  query(dict):::
340  name(str): Name of query
341  mapdql(str): Query syntax to run
342  """
343  setup_teardown_queries_dir = kwargs['queries_dir']
344  source_table = kwargs['source_table']
345  # Read setup/tear-down queries if they exist
346  setup_teardown_query_list = None
347  if setup_teardown_queries_dir is not None:
348  setup_teardown_query_list = read_query_files(
349  queries_dir=setup_teardown_queries_dir,
350  source_table=source_table
351  )
352  if kwargs["foreign_table_filename"] is not None:
353  for query in setup_teardown_query_list['queries']:
354  query['mapdql'] = query['mapdql'].replace(
355  "##FILE##", kwargs["foreign_table_filename"])
356  # Filter setup queries
357  setup_query_list = None
358  if setup_teardown_query_list is not None:
359  setup_query_list = filter(
361  query_filename=x['name'], check_which='setup', quiet=True),
362  setup_teardown_query_list['queries'])
363  setup_query_list = list(setup_query_list)
364  # Filter teardown queries
365  teardown_query_list = None
366  if setup_teardown_query_list is not None:
367  teardown_query_list = filter(
369  query_filename=x['name'], check_which='teardown', quiet=True),
370  setup_teardown_query_list['queries'])
371  teardown_query_list = list(teardown_query_list)
372  return setup_query_list, teardown_query_list
373 
374 
376  """
377  Validates query file. Currently only checks the query file name, and
378  checks for setup or teardown in basename
379 
380  Kwargs:
381  query_filename(str): Name of query file
382  check_which(bool): either 'setup' or 'teardown', decide which to
383  check
384  quiet(bool): optional, if True, no warning is logged
385 
386  Returns:
387  True(bool): Query succesfully validated
388  False(bool): Query failed validation
389  """
390  qfilename = kwargs["query_filename"]
391  basename = os.path.basename(qfilename)
392  check_str = False
393  if kwargs["check_which"] == 'setup':
394  check_str = basename.lower().find('setup') > -1
395  elif kwargs["check_which"] == 'teardown':
396  check_str = basename.lower().find('teardown') > -1
397  else:
398  raise TypeError('Unsupported `check_which` parameter.')
399  return_val = True
400  if not qfilename.endswith(".sql"):
401  logging.warning(
402  "Query filename "
403  + qfilename
404  + ' is invalid - does not end in ".sql". Skipping'
405  )
406  return_val = False
407  elif not check_str:
408  quiet = True if 'quiet' in kwargs and kwargs['quiet'] else False
409  if not quiet:
410  logging.warning(
411  "Query filename "
412  + qfilename
413  + ' does not match "setup" or "teardown". Skipping'
414  )
415  return_val = False
416  return return_val
417 
418 
419 def validate_query_file(**kwargs):
420  """
421  Validates query file. Currently only checks the query file name
422 
423  Kwargs:
424  query_filename(str): Name of query file
425 
426  Returns:
427  True(bool): Query succesfully validated
428  False(bool): Query failed validation
429  """
430  if not kwargs["query_filename"].endswith(".sql"):
431  logging.warning(
432  "Query filename "
433  + kwargs["query_filename"]
434  + ' is invalid - does not end in ".sql". Skipping'
435  )
436  return False
437  else:
438  return True
439 
440 
441 def execute_query(**kwargs):
442  """
443  Executes a query against the connected db using pymapd
444  https://pymapd.readthedocs.io/en/latest/usage.html#querying
445 
446  Kwargs:
447  query_name(str): Name of query
448  query_mapdql(str): Query to run
449  iteration(int): Iteration number
450  con(class): Connection class
451 
452  Returns:
453  query_execution(dict):::
454  result_count(int): Number of results returned
455  execution_time(float): Time (in ms) that pymapd reports
456  backend spent on query.
457  connect_time(float): Time (in ms) for overhead of query, calculated
458  by subtracting backend execution time
459  from time spent on the execution function.
460  results_iter_time(float): Time (in ms) it took to for
461  pymapd.fetchone() to iterate through all
462  of the results.
463  total_time(float): Time (in ms) from adding all above times.
464  False(bool): The query failed. Exception should be logged.
465  """
466  start_time = timeit.default_timer()
467  try:
468  # Run the query
469  query_result = kwargs["con"].execute(kwargs["query_mapdql"])
470  logging.debug(
471  "Completed iteration "
472  + str(kwargs["iteration"])
473  + " of query "
474  + kwargs["query_name"]
475  )
476  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
477  logging.exception(
478  "Error running query "
479  + kwargs["query_name"]
480  + " during iteration "
481  + str(kwargs["iteration"])
482  )
483  return False
484 
485  # Calculate times
486  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
487  execution_time = query_result._result.execution_time_ms
488  debug_info = query_result._result.debug
489  connect_time = round((query_elapsed_time - execution_time), 1)
490  # Iterate through each result from the query
491  logging.debug(
492  "Counting results from query"
493  + kwargs["query_name"]
494  + " iteration "
495  + str(kwargs["iteration"])
496  )
497  result_count = 0
498  start_time = timeit.default_timer()
499  while query_result.fetchone():
500  result_count += 1
501  results_iter_time = round(
502  ((timeit.default_timer() - start_time) * 1000), 1
503  )
504  query_execution = {
505  "result_count": result_count,
506  "execution_time": execution_time,
507  "connect_time": connect_time,
508  "results_iter_time": results_iter_time,
509  "total_time": execution_time + connect_time + results_iter_time,
510  "debug_info": debug_info,
511  }
512  logging.debug(
513  "Execution results for query"
514  + kwargs["query_name"]
515  + " iteration "
516  + str(kwargs["iteration"])
517  + ": "
518  + str(query_execution)
519  )
520  return query_execution
521 
522 
523 def calculate_query_times(**kwargs):
524  """
525  Calculates aggregate query times from all iteration times
526 
527  Kwargs:
528  total_times(list): List of total time calculations
529  execution_times(list): List of execution_time calculations
530  results_iter_times(list): List of results_iter_time calculations
531  connect_times(list): List of connect_time calculations
532  trim(float): Amount to trim from iterations set to gather trimmed
533  values. Enter as deciman corresponding to percent to
534  trim - ex: 0.15 to trim 15%.
535 
536  Returns:
537  query_execution(dict): Query times
538  False(bool): The query failed. Exception should be logged.
539  """
540  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
541  return {
542  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
543  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
544  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
545  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
546  "total_time_trimmed_avg": round(
547  numpy.mean(
548  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
549  ),
550  1,
551  )
552  if trim_size
553  else round(numpy.mean(kwargs["total_times"]), 1),
554  "total_times": kwargs["total_times"],
555  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
556  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
557  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
558  "execution_time_85": round(
559  numpy.percentile(kwargs["execution_times"], 85), 1
560  ),
561  "execution_time_25": round(
562  numpy.percentile(kwargs["execution_times"], 25), 1
563  ),
564  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
565  "execution_time_trimmed_avg": round(
566  numpy.mean(
567  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
568  )
569  )
570  if trim_size > 0
571  else round(numpy.mean(kwargs["execution_times"]), 1),
572  "execution_time_trimmed_max": round(
573  numpy.max(
574  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
575  )
576  )
577  if trim_size > 0
578  else round(numpy.max(kwargs["execution_times"]), 1),
579  "execution_times": kwargs["execution_times"],
580  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
581  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
582  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
583  "connect_time_85": round(
584  numpy.percentile(kwargs["connect_times"], 85), 1
585  ),
586  "results_iter_time_avg": round(
587  numpy.mean(kwargs["results_iter_times"]), 1
588  ),
589  "results_iter_time_min": round(
590  numpy.min(kwargs["results_iter_times"]), 1
591  ),
592  "results_iter_time_max": round(
593  numpy.max(kwargs["results_iter_times"]), 1
594  ),
595  "results_iter_time_85": round(
596  numpy.percentile(kwargs["results_iter_times"], 85), 1
597  ),
598  }
599 
600 
601 def clear_memory(**kwargs):
602  """
603  Clears CPU or GPU memory
604 
605  Kwargs:
606  con(class 'pymapd.connection.Connection'): Mapd connection
607  mem_type(str): [gpu, cpu] Type of memory to clear
608 
609  Returns:
610  None
611  """
612  try:
613  session = kwargs["con"]._session
614  mem_type = kwargs["mem_type"]
615  if mem_type == 'cpu':
616  kwargs["con"]._client.clear_cpu_memory(session)
617  elif mem_type == 'gpu':
618  kwargs["con"]._client.clear_gpu_memory(session)
619  else:
620  raise TypeError("Unkown mem_type '" + str(mem_type) + "' supplied to 'clear_memory'")
621  except Exception as e:
622  errormessage = "Clear memory failed with error: " + str(e)
623  logging.error(errormessage)
624 
625 
627  """
628  Clears system caches
629  """
630  try:
631  os.system('sudo sh -c "/bin/echo 3 > /proc/sys/vm/drop_caches"')
632  except Exception as e:
633  errormessage = "Clear system caches failed with error: " + str(e)
634  logging.error(errormessage)
635 
636 
637 def get_mem_usage(**kwargs):
638  """
639  Calculates memory statistics from mapd_server _client.get_memory call
640 
641  Kwargs:
642  con(class 'pymapd.connection.Connection'): Mapd connection
643  mem_type(str): [gpu, cpu] Type of memory to gather metrics for
644 
645  Returns:
646  ramusage(dict):::
647  usedram(float): Amount of memory (in MB) used
648  freeram(float): Amount of memory (in MB) free
649  totalallocated(float): Total amount of memory (in MB) allocated
650  errormessage(str): Error if returned by get_memory call
651  rawdata(list): Raw data returned from get_memory call
652  """
653  try:
654  con_mem_data_list = kwargs["con"]._client.get_memory(
655  session=kwargs["con"]._session, memory_level=kwargs["mem_type"]
656  )
657  usedram = 0
658  freeram = 0
659  for con_mem_data in con_mem_data_list:
660  page_size = con_mem_data.page_size
661  node_memory_data_list = con_mem_data.node_memory_data
662  for node_memory_data in node_memory_data_list:
663  ram = node_memory_data.num_pages * page_size
664  is_free = node_memory_data.is_free
665  if is_free:
666  freeram += ram
667  else:
668  usedram += ram
669  totalallocated = usedram + freeram
670  if totalallocated > 0:
671  totalallocated = round(totalallocated / 1024 / 1024, 1)
672  usedram = round(usedram / 1024 / 1024, 1)
673  freeram = round(freeram / 1024 / 1024, 1)
674  ramusage = {}
675  ramusage["usedram"] = usedram
676  ramusage["freeram"] = freeram
677  ramusage["totalallocated"] = totalallocated
678  ramusage["errormessage"] = ""
679  except Exception as e:
680  errormessage = "Get memory failed with error: " + str(e)
681  logging.error(errormessage)
682  ramusage["errormessage"] = errormessage
683  return ramusage
684 
685 
686 def run_query(**kwargs):
687  """
688  Takes query name, syntax, and iteration count and calls the
689  execute_query function for each iteration. Reports total, iteration,
690  and exec timings, memory usage, and failure status.
691 
692  Kwargs:
693  query(dict):::
694  name(str): Name of query
695  mapdql(str): Query syntax to run
696  iterations(int): Number of iterations of each query to run
697  trim(float): Trim decimal to remove from top and bottom of results
698  con(class 'pymapd.connection.Connection'): Mapd connection
699  clear_all_memory_pre_query(bool,optional): Flag to determine if memory is cleared
700  between query runs
701 
702  Returns:
703  query_results(dict):::
704  query_name(str): Name of query
705  query_mapdql(str): Query to run
706  query_id(str): Query ID
707  query_succeeded(bool): Query succeeded
708  query_error_info(str): Query error info
709  result_count(int): Number of results returned
710  initial_iteration_results(dict):::
711  first_execution_time(float): Execution time for first query
712  iteration
713  first_connect_time(float): Connect time for first query
714  iteration
715  first_results_iter_time(float): Results iteration time for
716  first query iteration
717  first_total_time(float): Total time for first iteration
718  first_cpu_mem_usage(float): CPU memory usage for first query
719  iteration
720  first_gpu_mem_usage(float): GPU memory usage for first query
721  iteration
722  noninitial_iteration_results(list):::
723  execution_time(float): Time (in ms) that pymapd reports
724  backend spent on query.
725  connect_time(float): Time (in ms) for overhead of query,
726  calculated by subtracting backend execution time from
727  time spent on the execution function.
728  results_iter_time(float): Time (in ms) it took to for
729  pymapd.fetchone() to iterate through all of the results.
730  total_time(float): Time (in ms) from adding all above times.
731  query_total_elapsed_time(int): Total elapsed time for query
732  False(bool): The query failed. Exception should be logged.
733  """
734  logging.info(
735  "Running query: "
736  + kwargs["query"]["name"]
737  + " iterations: "
738  + str(kwargs["iterations"])
739  )
740  query_id = kwargs["query"]["name"].rsplit(".")[
741  0
742  ] # Query ID = filename without extention
743  query_results = {
744  "query_name": kwargs["query"]["name"],
745  "query_mapdql": kwargs["query"]["mapdql"],
746  "query_id": query_id,
747  "query_succeeded": True,
748  "query_error_info": "",
749  "initial_iteration_results": {},
750  "noninitial_iteration_results": [],
751  "query_total_elapsed_time": 0,
752  }
753  query_total_start_time = timeit.default_timer()
754  # Run iterations of query
755  for iteration in range(kwargs["iterations"]):
756  # Gather memory before running query iteration
757  logging.debug("Getting pre-query memory usage on CPU")
758  pre_query_cpu_mem_usage = get_mem_usage(
759  con=kwargs["con"], mem_type="cpu"
760  )
761  logging.debug("Getting pre-query memory usage on GPU")
762  pre_query_gpu_mem_usage = get_mem_usage(
763  con=kwargs["con"], mem_type="gpu"
764  )
765  if "clear_all_memory_pre_query" in kwargs and kwargs["clear_all_memory_pre_query"]:
766  # Clear GPU & CPU memory
767  clear_memory(
768  con=kwargs["con"], mem_type="cpu"
769  )
770  clear_memory(
771  con=kwargs["con"], mem_type="gpu"
772  )
774  # Run query iteration
775  logging.debug(
776  "Running iteration "
777  + str(iteration)
778  + " of query "
779  + kwargs["query"]["name"]
780  )
781  query_result = execute_query(
782  query_name=kwargs["query"]["name"],
783  query_mapdql=kwargs["query"]["mapdql"],
784  iteration=iteration,
785  con=kwargs["con"],
786  )
787  # Gather memory after running query iteration
788  logging.debug("Getting post-query memory usage on CPU")
789  post_query_cpu_mem_usage = get_mem_usage(
790  con=kwargs["con"], mem_type="cpu"
791  )
792  logging.debug("Getting post-query memory usage on GPU")
793  post_query_gpu_mem_usage = get_mem_usage(
794  con=kwargs["con"], mem_type="gpu"
795  )
796  # Calculate total (post minus pre) memory usage after query iteration
797  query_cpu_mem_usage = round(
798  post_query_cpu_mem_usage["usedram"]
799  - pre_query_cpu_mem_usage["usedram"],
800  1,
801  )
802  query_gpu_mem_usage = round(
803  post_query_gpu_mem_usage["usedram"]
804  - pre_query_gpu_mem_usage["usedram"],
805  1,
806  )
807  if query_result:
808  query_results.update(
809  query_error_info="" # TODO - interpret query error info
810  )
811  # Assign first query iteration times
812  if iteration == 0:
813  first_execution_time = round(query_result["execution_time"], 1)
814  first_connect_time = round(query_result["connect_time"], 1)
815  first_results_iter_time = round(
816  query_result["results_iter_time"], 1
817  )
818  first_total_time = (
819  first_execution_time
820  + first_connect_time
821  + first_results_iter_time
822  )
823  query_results.update(
824  initial_iteration_results={
825  "first_execution_time": first_execution_time,
826  "first_connect_time": first_connect_time,
827  "first_results_iter_time": first_results_iter_time,
828  "first_total_time": first_total_time,
829  "first_cpu_mem_usage": query_cpu_mem_usage,
830  "first_gpu_mem_usage": query_gpu_mem_usage,
831  }
832  )
833  else:
834  # Put noninitial iterations into query_result list
835  query_results["noninitial_iteration_results"].append(
836  query_result
837  )
838  # Verify no change in memory for noninitial iterations
839  if query_cpu_mem_usage != 0.0:
840  logging.error(
841  (
842  "Noninitial iteration ({0}) of query ({1}) "
843  + "shows non-zero CPU memory usage: {2}"
844  ).format(
845  iteration,
846  kwargs["query"]["name"],
847  query_cpu_mem_usage,
848  )
849  )
850  if query_gpu_mem_usage != 0.0:
851  logging.error(
852  (
853  "Noninitial iteration ({0}) of query ({1}) "
854  + "shows non-zero GPU memory usage: {2}"
855  ).format(
856  iteration,
857  kwargs["query"]["name"],
858  query_gpu_mem_usage,
859  )
860  )
861  else:
862  logging.warning(
863  "Error detected during execution of query: "
864  + kwargs["query"]["name"]
865  + ". This query will be skipped and "
866  + "times will not reported"
867  )
868  query_results.update(query_succeeded=False)
869  break
870  # Calculate time for all iterations to run
871  query_total_elapsed_time = round(
872  ((timeit.default_timer() - query_total_start_time) * 1000), 1
873  )
874  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
875  logging.info(
876  "Completed all iterations of query " + kwargs["query"]["name"]
877  )
878  return query_results
879 
880 
882  """
883  Convenience wrapper around `run_query` to run a setup or
884  teardown query
885 
886  Kwargs:
887  queries(query_list): List of queries to run
888  do_run(bool): If true will run query, otherwise do nothing
889  trim(float): Trim decimal to remove from top and bottom of results
890  con(class 'pymapd.connection.Connection'): Mapd connection
891 
892  Returns:
893  See return value for `run_query`
894 
895  query_list is described by:
896  queries(list)
897  query(dict):::
898  name(str): Name of query
899  mapdql(str): Query syntax to run
900  [setup : queries(list)]
901  [teardown : queries(list)]
902  """
903  query_results = list()
904  if kwargs['do_run']:
905  for query in kwargs['queries']:
906  result = run_query(
907  query=query, iterations=1,
908  trim=kwargs['trim'],
909  con=kwargs['con']
910  )
911  if not result['query_succeeded']:
912  logging.warning(
913  "Error setup or teardown query: "
914  + query["name"]
915  + ". did not complete."
916  )
917  else:
918  query_results.append(result)
919  return query_results
920 
921 
923  # Function to allow json to deal with datetime and numpy int
924  if isinstance(x, datetime.datetime):
925  return x.isoformat()
926  if isinstance(x, numpy.int64):
927  return int(x)
928  raise TypeError("Unknown type")
929 
930 
932  """
933  Create results dataset
934 
935  Kwargs:
936  run_guid(str): Run GUID
937  run_timestamp(datetime): Run timestamp
938  run_connection(str): Connection string
939  run_machine_name(str): Run machine name
940  run_machine_uname(str): Run machine uname
941  run_driver(str): Run driver
942  run_version(str): Version of DB
943  run_version_short(str): Shortened version of DB
944  label(str): Run label
945  source_db_gpu_count(int): Number of GPUs on run machine
946  source_db_gpu_driver_ver(str): GPU driver version
947  source_db_gpu_name(str): GPU name
948  source_db_gpu_mem(str): Amount of GPU mem on run machine
949  source_table(str): Table to run query against
950  trim(float): Trim decimal to remove from top and bottom of results
951  iterations(int): Number of iterations of each query to run
952  query_group(str): Query group, usually matches table name
953  query_results(dict):::
954  query_name(str): Name of query
955  query_mapdql(str): Query to run
956  query_id(str): Query ID
957  query_succeeded(bool): Query succeeded
958  query_error_info(str): Query error info
959  result_count(int): Number of results returned
960  initial_iteration_results(dict):::
961  first_execution_time(float): Execution time for first query
962  iteration
963  first_connect_time(float): Connect time for first query
964  iteration
965  first_results_iter_time(float): Results iteration time for
966  first query iteration
967  first_total_time(float): Total time for first iteration
968  first_cpu_mem_usage(float): CPU memory usage for first query
969  iteration
970  first_gpu_mem_usage(float): GPU memory usage for first query
971  iteration
972  noninitial_iteration_results(list):::
973  execution_time(float): Time (in ms) that pymapd reports
974  backend spent on query.
975  connect_time(float): Time (in ms) for overhead of query,
976  calculated by subtracting backend execution time from
977  time spent on the execution function.
978  results_iter_time(float): Time (in ms) it took to for
979  pymapd.fetchone() to iterate through all of the results.
980  total_time(float): Time (in ms) from adding all above times.
981  query_total_elapsed_time(int): Total elapsed time for query
982 
983  Returns:
984  results_dataset(list):::
985  result_dataset(dict): Query results dataset
986  """
987  results_dataset = []
988  for query_results in kwargs["queries_results"]:
989  if query_results["query_succeeded"]:
990  # Aggregate iteration values
991  execution_times, connect_times, results_iter_times, total_times = (
992  [],
993  [],
994  [],
995  [],
996  )
997  detailed_timing_last_iteration = {}
998  if len(query_results["noninitial_iteration_results"]) == 0:
999  # A single query run (most likely a setup or teardown query)
1000  initial_result = query_results["initial_iteration_results"]
1001  execution_times.append(initial_result["first_execution_time"])
1002  connect_times.append(initial_result["first_connect_time"])
1003  results_iter_times.append(
1004  initial_result["first_results_iter_time"]
1005  )
1006  total_times.append(initial_result["first_total_time"])
1007  # Special case
1008  result_count = 1
1009  else:
1010  # More than one query run
1011  for noninitial_result in query_results[
1012  "noninitial_iteration_results"
1013  ]:
1014  execution_times.append(noninitial_result["execution_time"])
1015  connect_times.append(noninitial_result["connect_time"])
1016  results_iter_times.append(
1017  noninitial_result["results_iter_time"]
1018  )
1019  total_times.append(noninitial_result["total_time"])
1020  # Overwrite result count, same for each iteration
1021  result_count = noninitial_result["result_count"]
1022 
1023  # If available, getting the last iteration's component-wise timing information as a json structure
1024  if (
1025  query_results["noninitial_iteration_results"][-1]["debug_info"]
1026  ):
1027  detailed_timing_last_iteration = json.loads(
1028  query_results["noninitial_iteration_results"][-1][
1029  "debug_info"
1030  ]
1031  )["timer"]
1032  # Calculate query times
1033  logging.debug(
1034  "Calculating times from query " + query_results["query_id"]
1035  )
1036  query_times = calculate_query_times(
1037  total_times=total_times,
1038  execution_times=execution_times,
1039  connect_times=connect_times,
1040  results_iter_times=results_iter_times,
1041  trim=kwargs[
1042  "trim"
1043  ], # Trim top and bottom n% for trimmed calculations
1044  )
1045  result_dataset = {
1046  "name": query_results["query_name"],
1047  "mapdql": query_results["query_mapdql"],
1048  "succeeded": True,
1049  "results": {
1050  "run_guid": kwargs["run_guid"],
1051  "run_timestamp": kwargs["run_timestamp"],
1052  "run_connection": kwargs["run_connection"],
1053  "run_machine_name": kwargs["run_machine_name"],
1054  "run_machine_uname": kwargs["run_machine_uname"],
1055  "run_driver": kwargs["run_driver"],
1056  "run_version": kwargs["run_version"],
1057  "run_version_short": kwargs["run_version_short"],
1058  "run_label": kwargs["label"],
1059  "run_gpu_count": kwargs["source_db_gpu_count"],
1060  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
1061  "run_gpu_name": kwargs["source_db_gpu_name"],
1062  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
1063  "run_table": kwargs["source_table"],
1064  "query_group": kwargs["query_group"],
1065  "query_id": query_results["query_id"],
1066  "query_result_set_count": result_count,
1067  "query_error_info": query_results["query_error_info"],
1068  "query_conn_first": query_results[
1069  "initial_iteration_results"
1070  ]["first_connect_time"],
1071  "query_conn_avg": query_times["connect_time_avg"],
1072  "query_conn_min": query_times["connect_time_min"],
1073  "query_conn_max": query_times["connect_time_max"],
1074  "query_conn_85": query_times["connect_time_85"],
1075  "query_exec_first": query_results[
1076  "initial_iteration_results"
1077  ]["first_execution_time"],
1078  "query_exec_avg": query_times["execution_time_avg"],
1079  "query_exec_min": query_times["execution_time_min"],
1080  "query_exec_max": query_times["execution_time_max"],
1081  "query_exec_85": query_times["execution_time_85"],
1082  "query_exec_25": query_times["execution_time_25"],
1083  "query_exec_stdd": query_times["execution_time_std"],
1084  "query_exec_trimmed_avg": query_times[
1085  "execution_time_trimmed_avg"
1086  ],
1087  "query_exec_trimmed_max": query_times[
1088  "execution_time_trimmed_max"
1089  ],
1090  # Render queries not supported yet
1091  "query_render_first": None,
1092  "query_render_avg": None,
1093  "query_render_min": None,
1094  "query_render_max": None,
1095  "query_render_85": None,
1096  "query_render_25": None,
1097  "query_render_stdd": None,
1098  "query_total_first": query_results[
1099  "initial_iteration_results"
1100  ]["first_total_time"],
1101  "query_total_avg": query_times["total_time_avg"],
1102  "query_total_min": query_times["total_time_min"],
1103  "query_total_max": query_times["total_time_max"],
1104  "query_total_85": query_times["total_time_85"],
1105  "query_total_all": query_results[
1106  "query_total_elapsed_time"
1107  ],
1108  "query_total_trimmed_avg": query_times[
1109  "total_time_trimmed_avg"
1110  ],
1111  "results_iter_count": kwargs["iterations"],
1112  "results_iter_first": query_results[
1113  "initial_iteration_results"
1114  ]["first_results_iter_time"],
1115  "results_iter_avg": query_times["results_iter_time_avg"],
1116  "results_iter_min": query_times["results_iter_time_min"],
1117  "results_iter_max": query_times["results_iter_time_max"],
1118  "results_iter_85": query_times["results_iter_time_85"],
1119  "cpu_mem_usage_mb": query_results[
1120  "initial_iteration_results"
1121  ]["first_cpu_mem_usage"],
1122  "gpu_mem_usage_mb": query_results[
1123  "initial_iteration_results"
1124  ]["first_gpu_mem_usage"],
1125  },
1126  "debug": {
1127  "query_exec_times": query_times["execution_times"],
1128  "query_total_times": query_times["total_times"],
1129  "detailed_timing_last_iteration": detailed_timing_last_iteration,
1130  },
1131  }
1132  elif not query_results["query_succeeded"]:
1133  result_dataset = {
1134  "name": query_results["query_name"],
1135  "mapdql": query_results["query_mapdql"],
1136  "succeeded": False,
1137  }
1138  results_dataset.append(result_dataset)
1139  logging.debug("All values set for query " + query_results["query_id"])
1140  return results_dataset
1141 
1142 
1143 def send_results_db(**kwargs):
1144  """
1145  Send results dataset to a database using pymapd
1146 
1147  Kwargs:
1148  results_dataset(list):::
1149  result_dataset(dict): Query results dataset
1150  table(str): Results destination table name
1151  db_user(str): Results destination user name
1152  db_passwd(str): Results destination password
1153  db_server(str): Results destination server address
1154  db_port(int): Results destination server port
1155  db_name(str): Results destination database name
1156  table_schema_file(str): Path to destination database schema file
1157 
1158  Returns:
1159  True(bool): Sending results to destination database succeeded
1160  False(bool): Sending results to destination database failed. Exception
1161  should be logged.
1162  """
1163  # Create dataframe from list of query results
1164  logging.debug("Converting results list to pandas dataframe")
1165  results_df = DataFrame(kwargs["results_dataset"])
1166  # Establish connection to destination db
1167  logging.debug("Connecting to destination db")
1168  dest_con = get_connection(
1169  db_user=kwargs["db_user"],
1170  db_passwd=kwargs["db_passwd"],
1171  db_server=kwargs["db_server"],
1172  db_port=kwargs["db_port"],
1173  db_name=kwargs["db_name"],
1174  )
1175  if not dest_con:
1176  logging.exception("Could not connect to destination db.")
1177  return False
1178  # Load results into db, creating table if it does not exist
1179  tables = dest_con.get_tables()
1180  if kwargs["table"] not in tables:
1181  logging.info("Destination table does not exist. Creating.")
1182  try:
1183  with open(kwargs["table_schema_file"], "r") as table_schema:
1184  logging.debug(
1185  "Reading table_schema_file: " + kwargs["table_schema_file"]
1186  )
1187  create_table_sql = table_schema.read().replace("\n", " ")
1188  create_table_sql = create_table_sql.replace(
1189  "##TAB##", kwargs["table"]
1190  )
1191  except FileNotFoundError:
1192  logging.exception("Could not find destination table_schema_file.")
1193  return False
1194  try:
1195  logging.debug("Executing create destination table query")
1196  dest_con.execute(create_table_sql)
1197  logging.debug("Destination table created.")
1198  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1199  logging.exception("Error running destination table creation")
1200  return False
1201  logging.info("Loading results into destination db")
1202  try:
1203  dest_con.load_table_columnar(
1204  kwargs["table"],
1205  results_df,
1206  preserve_index=False,
1207  chunk_size_bytes=0,
1208  col_names_from_schema=True,
1209  )
1210  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
1211  logging.exception("Error loading results into destination db")
1212  dest_con.close()
1213  return False
1214  dest_con.close()
1215  return True
1216 
1217 
1219  """
1220  Send results dataset to a local json file
1221 
1222  Kwargs:
1223  results_dataset_json(str): Json-formatted query results dataset
1224  output_file_json (str): Location of .json file output
1225 
1226  Returns:
1227  True(bool): Sending results to json file succeeded
1228  False(bool): Sending results to json file failed. Exception
1229  should be logged.
1230  """
1231  try:
1232  logging.debug("Opening json output file for writing")
1233  with open(kwargs["output_file_json"], "w") as file_json_open:
1234  logging.info(
1235  "Writing to output json file: " + kwargs["output_file_json"]
1236  )
1237  file_json_open.write(kwargs["results_dataset_json"])
1238  return True
1239  except IOError:
1240  logging.exception("Error writing results to json output file")
1241  return False
1242 
1243 
1245  """
1246  Send results dataset to a local json file formatted for use with jenkins
1247  benchmark plugin: https://github.com/jenkinsci/benchmark-plugin
1248 
1249  Kwargs:
1250  results_dataset(list):::
1251  result_dataset(dict): Query results dataset
1252  thresholds_name(str): Name to use for Jenkins result field
1253  thresholds_field(str): Field to use for query threshold in jenkins
1254  output_tag_jenkins(str): Jenkins benchmark result tag, for different
1255  sets from same table
1256  output_file_jenkins (str): Location of .json jenkins file output
1257 
1258  Returns:
1259  True(bool): Sending results to json file succeeded
1260  False(bool): Sending results to json file failed. Exception
1261  should be logged.
1262  """
1263  jenkins_bench_results = []
1264  for result_dataset in kwargs["results_dataset"]:
1265  logging.debug("Constructing output for jenkins benchmark plugin")
1266  jenkins_bench_results.append(
1267  {
1268  "name": result_dataset["query_id"],
1269  "description": "",
1270  "parameters": [],
1271  "results": [
1272  {
1273  "name": result_dataset["query_id"]
1274  + "_"
1275  + kwargs["thresholds_name"],
1276  "description": "",
1277  "unit": "ms",
1278  "dblValue": result_dataset[kwargs["thresholds_field"]],
1279  }
1280  ],
1281  }
1282  )
1283  jenkins_bench_json = json.dumps(
1284  {
1285  "groups": [
1286  {
1287  "name": result_dataset["run_table"]
1288  + kwargs["output_tag_jenkins"],
1289  "description": "Source table: "
1290  + result_dataset["run_table"],
1291  "tests": jenkins_bench_results,
1292  }
1293  ]
1294  }
1295  )
1296  try:
1297  logging.debug("Opening jenkins_bench json output file for writing")
1298  with open(kwargs["output_file_jenkins"], "w") as file_jenkins_open:
1299  logging.info(
1300  "Writing to jenkins_bench json file: "
1301  + kwargs["output_file_jenkins"]
1302  )
1303  file_jenkins_open.write(jenkins_bench_json)
1304  return True
1305  except IOError:
1306  logging.exception("Error writing results to jenkins json output file")
1307  return False
1308 
1309 
1310 def send_results_output(**kwargs):
1311  """
1312  Send results dataset script output
1313 
1314  Kwargs:
1315  results_dataset_json(str): Json-formatted query results dataset
1316 
1317  Returns:
1318  True(bool): Sending results to output succeeded
1319  """
1320  logging.info("Printing query results to output")
1321  print(kwargs["results_dataset_json"])
1322  return True
1323 
1324 
1325 def process_arguments(input_arguments):
1326  # Parse input parameters
1327  parser = ArgumentParser()
1328  optional = parser._action_groups.pop()
1329  required = parser.add_argument_group("required arguments")
1330  parser._action_groups.append(optional)
1331  optional.add_argument(
1332  "-v", "--verbose", action="store_true", help="Turn on debug logging"
1333  )
1334  optional.add_argument(
1335  "-q",
1336  "--quiet",
1337  action="store_true",
1338  help="Suppress script outuput " + "(except warnings and errors)",
1339  )
1340  required.add_argument(
1341  "-u",
1342  "--user",
1343  dest="user",
1344  default="mapd",
1345  help="Source database user",
1346  )
1347  required.add_argument(
1348  "-p",
1349  "--passwd",
1350  dest="passwd",
1351  default="HyperInteractive",
1352  help="Source database password",
1353  )
1354  required.add_argument(
1355  "-s",
1356  "--server",
1357  dest="server",
1358  default="localhost",
1359  help="Source database server hostname",
1360  )
1361  optional.add_argument(
1362  "-o",
1363  "--port",
1364  dest="port",
1365  type=int,
1366  default=6274,
1367  help="Source database server port",
1368  )
1369  required.add_argument(
1370  "-n",
1371  "--name",
1372  dest="name",
1373  default="mapd",
1374  help="Source database name",
1375  )
1376  required.add_argument(
1377  "-t",
1378  "--table",
1379  dest="table",
1380  required=True,
1381  help="Source db table name",
1382  )
1383  required.add_argument(
1384  "-l",
1385  "--label",
1386  dest="label",
1387  required=True,
1388  help="Benchmark run label",
1389  )
1390  required.add_argument(
1391  "-d",
1392  "--queries-dir",
1393  dest="queries_dir",
1394  help='Absolute path to dir with query files. \
1395  [Default: "queries" dir in same location as script]',
1396  )
1397  required.add_argument(
1398  "-i",
1399  "--iterations",
1400  dest="iterations",
1401  type=int,
1402  required=True,
1403  help="Number of iterations per query. Must be > 1",
1404  )
1405  optional.add_argument(
1406  "-g",
1407  "--gpu-count",
1408  dest="gpu_count",
1409  type=int,
1410  default=None,
1411  help="Number of GPUs. Not required when gathering local gpu info",
1412  )
1413  optional.add_argument(
1414  "-G",
1415  "--gpu-name",
1416  dest="gpu_name",
1417  type=str,
1418  default="",
1419  help="Name of GPU(s). Not required when gathering local gpu info",
1420  )
1421  optional.add_argument(
1422  "--no-gather-conn-gpu-info",
1423  dest="no_gather_conn_gpu_info",
1424  action="store_true",
1425  help="Do not gather source database GPU info fields "
1426  + "[run_gpu_count, run_gpu_mem_mb] "
1427  + "using pymapd connection info. "
1428  + "Use when testing a CPU-only server.",
1429  )
1430  optional.add_argument(
1431  "--no-gather-nvml-gpu-info",
1432  dest="no_gather_nvml_gpu_info",
1433  action="store_true",
1434  help="Do not gather source database GPU info fields "
1435  + "[gpu_driver_ver, run_gpu_name] "
1436  + "from local GPU using pynvml. "
1437  + 'Defaults to True when source server is not "localhost". '
1438  + "Use when testing a CPU-only server.",
1439  )
1440  optional.add_argument(
1441  "--gather-nvml-gpu-info",
1442  dest="gather_nvml_gpu_info",
1443  action="store_true",
1444  help="Gather source database GPU info fields "
1445  + "[gpu_driver_ver, run_gpu_name] "
1446  + "from local GPU using pynvml. "
1447  + 'Defaults to True when source server is "localhost". '
1448  + "Only use when benchmarking against same machine that this script "
1449  + "is run from.",
1450  )
1451  optional.add_argument(
1452  "-m",
1453  "--machine-name",
1454  dest="machine_name",
1455  help="Name of source machine",
1456  )
1457  optional.add_argument(
1458  "-a",
1459  "--machine-uname",
1460  dest="machine_uname",
1461  help="Uname info from " + "source machine",
1462  )
1463  optional.add_argument(
1464  "-e",
1465  "--destination",
1466  dest="destination",
1467  default="mapd_db",
1468  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
1469  + "Multiple values can be input seperated by commas, "
1470  + 'ex: "mapd_db,file_json"',
1471  )
1472  optional.add_argument(
1473  "-U",
1474  "--dest-user",
1475  dest="dest_user",
1476  default="mapd",
1477  help="Destination mapd_db database user",
1478  )
1479  optional.add_argument(
1480  "-P",
1481  "--dest-passwd",
1482  dest="dest_passwd",
1483  default="HyperInteractive",
1484  help="Destination mapd_db database password",
1485  )
1486  optional.add_argument(
1487  "-S",
1488  "--dest-server",
1489  dest="dest_server",
1490  help="Destination mapd_db database server hostname"
1491  + ' (required if destination = "mapd_db")',
1492  )
1493  optional.add_argument(
1494  "-O",
1495  "--dest-port",
1496  dest="dest_port",
1497  type=int,
1498  default=6274,
1499  help="Destination mapd_db database server port",
1500  )
1501  optional.add_argument(
1502  "-N",
1503  "--dest-name",
1504  dest="dest_name",
1505  default="mapd",
1506  help="Destination mapd_db database name",
1507  )
1508  optional.add_argument(
1509  "-T",
1510  "--dest-table",
1511  dest="dest_table",
1512  default="results",
1513  help="Destination mapd_db table name",
1514  )
1515  optional.add_argument(
1516  "-C",
1517  "--dest-table-schema-file",
1518  dest="dest_table_schema_file",
1519  default="results_table_schemas/query-results.sql",
1520  help="Destination table schema file. This must be an executable "
1521  + "CREATE TABLE statement that matches the output of this script. It "
1522  + "is required when creating the results table. Default location is "
1523  + 'in "./results_table_schemas/query-results.sql"',
1524  )
1525  optional.add_argument(
1526  "-j",
1527  "--output-file-json",
1528  dest="output_file_json",
1529  help="Absolute path of .json output file "
1530  + '(required if destination = "file_json")',
1531  )
1532  optional.add_argument(
1533  "-J",
1534  "--output-file-jenkins",
1535  dest="output_file_jenkins",
1536  help="Absolute path of jenkins benchmark .json output file "
1537  + '(required if destination = "jenkins_bench")',
1538  )
1539  optional.add_argument(
1540  "-E",
1541  "--output-tag-jenkins",
1542  dest="output_tag_jenkins",
1543  default="",
1544  help="Jenkins benchmark result tag. "
1545  + 'Optional, appended to table name in "group" field',
1546  )
1547  optional.add_argument(
1548  "--setup-teardown-queries-dir",
1549  dest="setup_teardown_queries_dir",
1550  type=str,
1551  default=None,
1552  help='Absolute path to dir with setup & teardown query files. '
1553  'Query files with "setup" in the filename will be executed in '
1554  'the setup stage, likewise query files with "teardown" in '
1555  'the filenname will be executed in the tear-down stage. Queries '
1556  'execute in lexical order. [Default: None, meaning this option is '
1557  'not used.]',
1558  )
1559  optional.add_argument(
1560  "--clear-all-memory-pre-query",
1561  dest="clear_all_memory_pre_query",
1562  action="store_true",
1563  help='Clear gpu & cpu memory before every query.'
1564  ' [Default: False]'
1565  )
1566  optional.add_argument(
1567  "--run-setup-teardown-per-query",
1568  dest="run_setup_teardown_per_query",
1569  action="store_true",
1570  help='Run setup & teardown steps per query. '
1571  'If set, setup-teardown-queries-dir must be specified. '
1572  'If not set, but setup-teardown-queries-dir is specified '
1573  'setup & tear-down queries will run globally, that is, '
1574  'once per script invocation.'
1575  ' [Default: False]'
1576  )
1577  optional.add_argument(
1578  "-F",
1579  "--foreign-table-filename",
1580  dest="foreign_table_filename",
1581  default=None,
1582  help="Path to file containing template for import query. "
1583  "Path must be relative to the FOREIGN SERVER. "
1584  "Occurances of \"##FILE##\" within setup/teardown queries will be"
1585  " replaced with this. "
1586  )
1587  optional.add_argument(
1588  "--jenkins-thresholds-name",
1589  dest="jenkins_thresholds_name",
1590  default="average",
1591  help="Name of Jenkins output field.",
1592  )
1593  optional.add_argument(
1594  "--jenkins-thresholds-field",
1595  dest="jenkins_thresholds_field",
1596  default="query_exec_trimmed_avg",
1597  help="Field to report as jenkins output value.",
1598  )
1599  args = parser.parse_args(args=input_arguments)
1600  return args
1601 
1602 
1603 def benchmark(input_arguments):
1604  # Set input args to vars
1605  args = process_arguments(input_arguments)
1606  verbose = args.verbose
1607  quiet = args.quiet
1608  source_db_user = args.user
1609  source_db_passwd = args.passwd
1610  source_db_server = args.server
1611  source_db_port = args.port
1612  source_db_name = args.name
1613  source_table = args.table
1614  label = args.label
1615  queries_dir = args.queries_dir
1616  iterations = args.iterations
1617  gpu_count = args.gpu_count
1618  gpu_name = args.gpu_name
1619  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
1620  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
1621  gather_nvml_gpu_info = args.gather_nvml_gpu_info
1622  machine_name = args.machine_name
1623  machine_uname = args.machine_uname
1624  destinations = args.destination
1625  dest_db_user = args.dest_user
1626  dest_db_passwd = args.dest_passwd
1627  dest_db_server = args.dest_server
1628  dest_db_port = args.dest_port
1629  dest_db_name = args.dest_name
1630  dest_table = args.dest_table
1631  dest_table_schema_file = args.dest_table_schema_file
1632  output_file_json = args.output_file_json
1633  output_file_jenkins = args.output_file_jenkins
1634  output_tag_jenkins = args.output_tag_jenkins
1635  setup_teardown_queries_dir = args.setup_teardown_queries_dir
1636  run_setup_teardown_per_query = args.run_setup_teardown_per_query
1637  foreign_table_filename = args.foreign_table_filename
1638  jenkins_thresholds_name = args.jenkins_thresholds_name
1639  jenkins_thresholds_field = args.jenkins_thresholds_field
1640  clear_all_memory_pre_query = args.clear_all_memory_pre_query
1641 
1642  # Hard-coded vars
1643  trim = 0.15
1644 
1645  # Set logging output level
1646  if verbose:
1647  logging.basicConfig(level=logging.DEBUG)
1648  elif quiet:
1649  logging.basicConfig(level=logging.WARNING)
1650  else:
1651  logging.basicConfig(level=logging.INFO)
1652 
1653  # Input validation
1654  if (iterations > 1) is not True:
1655  # Need > 1 iteration as first iteration is dropped from calculations
1656  logging.error("Iterations must be greater than 1")
1657  exit(1)
1659  destinations=destinations,
1660  dest_db_server=dest_db_server,
1661  output_file_json=output_file_json,
1662  output_file_jenkins=output_file_jenkins,
1663  ):
1664  logging.debug("Destination(s) have been verified.")
1665  else:
1666  logging.error("No valid destination(s) have been set. Exiting.")
1667  exit(1)
1668 
1669  # Establish connection to mapd db
1670  con = get_connection(
1671  db_user=source_db_user,
1672  db_passwd=source_db_passwd,
1673  db_server=source_db_server,
1674  db_port=source_db_port,
1675  db_name=source_db_name,
1676  )
1677  if not con:
1678  exit(1) # Exit if cannot connect to db
1679  # Set run-specific variables (time, uid, etc.)
1680  run_vars = get_run_vars(con=con)
1681  # Set GPU info depending on availability
1682  gpu_info = get_gpu_info(
1683  gpu_name=gpu_name,
1684  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
1685  con=con,
1686  conn_machine_name=run_vars["conn_machine_name"],
1687  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
1688  gather_nvml_gpu_info=gather_nvml_gpu_info,
1689  gpu_count=gpu_count,
1690  )
1691  # Set run machine info
1692  machine_info = get_machine_info(
1693  conn_machine_name=run_vars["conn_machine_name"],
1694  machine_name=machine_name,
1695  machine_uname=machine_uname,
1696  )
1697  # Read queries from files, set to queries dir in PWD if not passed in
1698  if not queries_dir:
1699  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
1700  query_list = read_query_files(
1701  queries_dir=queries_dir, source_table=source_table
1702  )
1703  if not query_list:
1704  exit(1)
1705  # Read setup/teardown queries if they exist
1706  setup_query_list, teardown_query_list =\
1707  read_setup_teardown_query_files(queries_dir=setup_teardown_queries_dir,
1708  source_table=source_table,
1709  foreign_table_filename=foreign_table_filename)
1710  # Check at what granularity we want to run setup or teardown queries at
1711  run_global_setup_queries = setup_query_list is not None and not run_setup_teardown_per_query
1712  run_per_query_setup_queries = setup_query_list is not None and run_setup_teardown_per_query
1713  run_global_teardown_queries = teardown_query_list is not None and not run_setup_teardown_per_query
1714  run_per_query_teardown_queries = teardown_query_list is not None and run_setup_teardown_per_query
1715  # Run global setup queries if they exist
1716  queries_results = []
1717  st_qr = run_setup_teardown_query(queries=setup_query_list,
1718  do_run=run_global_setup_queries, trim=trim, con=con)
1719  queries_results.extend(st_qr)
1720  # Run queries
1721  for query in query_list["queries"]:
1722  # Run setup queries
1723  st_qr = run_setup_teardown_query(
1724  queries=setup_query_list, do_run=run_per_query_setup_queries, trim=trim, con=con)
1725  queries_results.extend(st_qr)
1726  # Run benchmark query
1727  query_result = run_query(
1728  query=query, iterations=iterations, trim=trim, con=con, clear_all_memory_pre_query=clear_all_memory_pre_query
1729  )
1730  queries_results.append(query_result)
1731  # Run tear-down queries
1732  st_qr = run_setup_teardown_query(
1733  queries=teardown_query_list, do_run=run_per_query_teardown_queries, trim=trim, con=con)
1734  queries_results.extend(st_qr)
1735  logging.info("Completed all queries.")
1736  # Run global tear-down queries if they exist
1737  st_qr = run_setup_teardown_query(queries=teardown_query_list,
1738  do_run=run_global_teardown_queries, trim=trim, con=con)
1739  queries_results.extend(st_qr)
1740  logging.debug("Closing source db connection.")
1741  con.close()
1742  # Generate results dataset
1743  results_dataset = create_results_dataset(
1744  run_guid=run_vars["run_guid"],
1745  run_timestamp=run_vars["run_timestamp"],
1746  run_connection=run_vars["run_connection"],
1747  run_machine_name=machine_info["run_machine_name"],
1748  run_machine_uname=machine_info["run_machine_uname"],
1749  run_driver=run_vars["run_driver"],
1750  run_version=run_vars["run_version"],
1751  run_version_short=run_vars["run_version_short"],
1752  label=label,
1753  source_db_gpu_count=gpu_info["source_db_gpu_count"],
1754  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
1755  source_db_gpu_name=gpu_info["source_db_gpu_name"],
1756  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
1757  source_table=source_table,
1758  trim=trim,
1759  iterations=iterations,
1760  query_group=query_list["query_group"],
1761  queries_results=queries_results,
1762  )
1763  results_dataset_json = json.dumps(
1764  results_dataset, default=json_format_handler, indent=2
1765  )
1766  successful_results_dataset = [
1767  x for x in results_dataset if x["succeeded"] is not False
1768  ]
1769  successful_results_dataset_results = []
1770  for results_dataset_entry in successful_results_dataset:
1771  successful_results_dataset_results.append(
1772  results_dataset_entry["results"]
1773  )
1774  # Send results to destination(s)
1775  sent_destination = True
1776  if "mapd_db" in destinations:
1777  if not send_results_db(
1778  results_dataset=successful_results_dataset_results,
1779  table=dest_table,
1780  db_user=dest_db_user,
1781  db_passwd=dest_db_passwd,
1782  db_server=dest_db_server,
1783  db_port=dest_db_port,
1784  db_name=dest_db_name,
1785  table_schema_file=dest_table_schema_file,
1786  ):
1787  sent_destination = False
1788  if "file_json" in destinations:
1789  if not send_results_file_json(
1790  results_dataset_json=results_dataset_json,
1791  output_file_json=output_file_json,
1792  ):
1793  sent_destination = False
1794  if "jenkins_bench" in destinations:
1796  results_dataset=successful_results_dataset_results,
1797  thresholds_name=jenkins_thresholds_name,
1798  thresholds_field=jenkins_thresholds_field,
1799  output_tag_jenkins=output_tag_jenkins,
1800  output_file_jenkins=output_file_jenkins,
1801  ):
1802  sent_destination = False
1803  if "output" in destinations:
1804  if not send_results_output(results_dataset_json=results_dataset_json):
1805  sent_destination = False
1806  if not sent_destination:
1807  logging.error("Sending results to one or more destinations failed")
1808  exit(1)
1809  else:
1810  logging.info(
1811  "Succesfully loaded query results info into destination(s)"
1812  )
1813 
1814 
1815 if __name__ == "__main__":
1816  benchmark(sys.argv[1:])
int open(const char *path, int flags, int mode)
Definition: omnisci_fs.cpp:64
size_t append(FILE *f, const size_t size, const int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:159
std::string join(T const &container, std::string const &delim)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
def validate_setup_teardown_query_file
def verify_destinations
def create_results_dataset
def read_setup_teardown_query_files
def send_results_file_json
def calculate_query_times
def run_setup_teardown_query
def send_results_jenkins_bench