OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
run_benchmark.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pymapd
9 import re
10 import sys
11 from pandas import DataFrame
12 from argparse import ArgumentParser
13 
14 # For usage info, run: `./<script_name>.py --help`
15 
16 
17 def verify_destinations(**kwargs):
18  """
19  Verify script output destination(s)
20 
21  Kwargs:
22  destinations (list): List of destinations
23  dest_db_server (str): DB output destination server
24  output_file_json (str): Location of .json file output
25  output_file_jenkins (str): Location of .json jenkins file output
26 
27  Returns:
28  True(bool): Destination(s) is/are valid
29  False(bool): Destination(s) is/are not valid
30  """
31  if "mapd_db" in kwargs["destinations"]:
32  valid_destination_set = True
33  if kwargs["dest_db_server"] is None:
34  # If dest_server is not set for mapd_db, then exit
35  logging.error(
36  '"dest_server" is required when destination = "mapd_db"'
37  )
38  if "file_json" in kwargs["destinations"]:
39  valid_destination_set = True
40  if kwargs["output_file_json"] is None:
41  # If output_file_json is not set for file_json, then exit
42  logging.error(
43  '"output_file_json" is required when destination = "file_json"'
44  )
45  if "output" in kwargs["destinations"]:
46  valid_destination_set = True
47  if "jenkins_bench" in kwargs["destinations"]:
48  valid_destination_set = True
49  if kwargs["output_file_jenkins"] is None:
50  # If output_file_jenkins is not set for jenkins_bench, then exit
51  logging.error(
52  '"output_file_jenkins" is required '
53  + 'when destination = "jenkins_bench"'
54  )
55  if not valid_destination_set:
56  return False
57  else:
58  return True
59 
60 
61 def get_connection(**kwargs):
62  """
63  Connects to the db using pymapd
64  https://pymapd.readthedocs.io/en/latest/usage.html#connecting
65 
66  Kwargs:
67  db_user(str): DB username
68  db_passwd(str): DB password
69  db_server(str): DB host
70  db_port(int): DB port
71  db_name(str): DB name
72 
73  Returns:
74  con(class): Connection class
75  False(bool): The connection failed. Exception should be logged.
76  """
77  try:
78  logging.debug("Connecting to mapd db...")
79  con = pymapd.connect(
80  user=kwargs["db_user"],
81  password=kwargs["db_passwd"],
82  host=kwargs["db_server"],
83  port=kwargs["db_port"],
84  dbname=kwargs["db_name"],
85  )
86  logging.info("Succesfully connected to mapd db")
87  return con
88  except (pymapd.exceptions.OperationalError, pymapd.exceptions.Error):
89  logging.exception("Error connecting to database.")
90  return False
91 
92 
93 def get_run_vars(**kwargs):
94  """
95  Gets/sets run-specific vars such as time, uid, etc.
96 
97  Kwargs:
98  con(class 'pymapd.connection.Connection'): Mapd connection
99 
100  Returns:
101  run_vars(dict):::
102  run_guid(str): Run GUID
103  run_timestamp(datetime): Run timestamp
104  run_connection(str): Connection string
105  run_driver(str): Run driver
106  run_version(str): Version of DB
107  run_version_short(str): Shortened version of DB
108  conn_machine_name(str): Name of run machine
109  """
110  run_guid = str(uuid.uuid4())
111  logging.debug("Run guid: " + run_guid)
112  run_timestamp = datetime.datetime.now()
113  run_connection = str(kwargs["con"])
114  logging.debug("Connection string: " + run_connection)
115  run_driver = "" # TODO
116  run_version = kwargs["con"]._client.get_version()
117  if "-" in run_version:
118  run_version_short = run_version.split("-")[0]
119  else:
120  run_version_short = run_version
121  conn_machine_name = re.search(r"@(.*?):", run_connection).group(1)
122  run_vars = {
123  "run_guid": run_guid,
124  "run_timestamp": run_timestamp,
125  "run_connection": run_connection,
126  "run_driver": run_driver,
127  "run_version": run_version,
128  "run_version_short": run_version_short,
129  "conn_machine_name": conn_machine_name,
130  }
131  return run_vars
132 
133 
134 def get_gpu_info(**kwargs):
135  """
136  Gets run machine GPU info
137 
138  Kwargs:
139  gpu_name(str): GPU name from input param
140  no_gather_conn_gpu_info(bool): Gather GPU info fields
141  con(class 'pymapd.connection.Connection'): Mapd connection
142  conn_machine_name(str): Name of run machine
143  no_gather_nvml_gpu_info(bool): Do not gather GPU info using nvml
144  gather_nvml_gpu_info(bool): Gather GPU info using nvml
145  gpu_count(int): Number of GPUs on run machine
146 
147  Returns:
148  gpu_info(dict):::
149  conn_gpu_count(int): Number of GPUs gathered from pymapd con
150  source_db_gpu_count(int): Number of GPUs on run machine
151  source_db_gpu_mem(str): Amount of GPU mem on run machine
152  source_db_gpu_driver_ver(str): GPU driver version
153  source_db_gpu_name(str): GPU name
154  """
155  # Set GPU info fields
156  conn_gpu_count = None
157  source_db_gpu_count = None
158  source_db_gpu_mem = None
159  source_db_gpu_driver_ver = ""
160  source_db_gpu_name = ""
161  if kwargs["no_gather_conn_gpu_info"]:
162  logging.debug(
163  "--no-gather-conn-gpu-info passed, "
164  + "using blank values for source database GPU info fields "
165  + "[run_gpu_count, run_gpu_mem_mb] "
166  )
167  else:
168  logging.debug(
169  "Gathering source database GPU info fields "
170  + "[run_gpu_count, run_gpu_mem_mb] "
171  + "using pymapd connection info. "
172  )
173  conn_hardware_info = kwargs["con"]._client.get_hardware_info(
174  kwargs["con"]._session
175  )
176  conn_gpu_count = conn_hardware_info.hardware_info[0].num_gpu_allocated
177  if conn_gpu_count == 0 or conn_gpu_count is None:
178  no_gather_nvml_gpu_info = True
179  if conn_gpu_count == 0:
180  logging.warning(
181  "0 GPUs detected from connection info, "
182  + "using blank values for source database GPU info fields "
183  + "If running against cpu-only server, make sure to set "
184  + "--no-gather-nvml-gpu-info and --no-gather-conn-gpu-info."
185  )
186  else:
187  no_gather_nvml_gpu_info = kwargs["no_gather_nvml_gpu_info"]
188  source_db_gpu_count = conn_gpu_count
189  try:
190  source_db_gpu_mem = int(
191  conn_hardware_info.hardware_info[0].gpu_info[0].memory
192  / 1000000
193  )
194  except IndexError:
195  logging.error("GPU memory info not available from connection.")
196  if no_gather_nvml_gpu_info:
197  logging.debug(
198  "--no-gather-nvml-gpu-info passed, "
199  + "using blank values for source database GPU info fields "
200  + "[gpu_driver_ver, run_gpu_name] "
201  )
202  elif (
203  kwargs["conn_machine_name"] == "localhost"
204  or kwargs["gather_nvml_gpu_info"]
205  ):
206  logging.debug(
207  "Gathering source database GPU info fields "
208  + "[gpu_driver_ver, run_gpu_name] "
209  + "from local GPU using pynvml. "
210  )
211  import pynvml
212 
213  pynvml.nvmlInit()
214  source_db_gpu_driver_ver = pynvml.nvmlSystemGetDriverVersion().decode()
215  for i in range(source_db_gpu_count):
216  handle = pynvml.nvmlDeviceGetHandleByIndex(i)
217  # Assume all cards are the same, overwrite name value
218  source_db_gpu_name = pynvml.nvmlDeviceGetName(handle).decode()
219  pynvml.nvmlShutdown()
220  # If gpu_count argument passed in, override gathered value
221  if kwargs["gpu_count"]:
222  source_db_gpu_count = kwargs["gpu_count"]
223  if kwargs["gpu_name"]:
224  source_db_gpu_name = kwargs["gpu_name"]
225  gpu_info = {
226  "conn_gpu_count": conn_gpu_count,
227  "source_db_gpu_count": source_db_gpu_count,
228  "source_db_gpu_mem": source_db_gpu_mem,
229  "source_db_gpu_driver_ver": source_db_gpu_driver_ver,
230  "source_db_gpu_name": source_db_gpu_name,
231  }
232  return gpu_info
233 
234 
235 def get_machine_info(**kwargs):
236  """
237  Gets run machine GPU info
238 
239  Kwargs:
240  conn_machine_name(str): Name of machine from pymapd con
241  machine_name(str): Name of machine if passed in
242  machine_uname(str): Uname of machine if passed in
243 
244  Returns:
245  machine_info(dict):::
246  run_machine_name(str): Run machine name
247  run_machine_uname(str): Run machine uname
248  """
249  # Set machine names, using local info if connected to localhost
250  if kwargs["conn_machine_name"] == "localhost":
251  local_uname = os.uname()
252  # If --machine-name passed in, override pymapd con value
253  if kwargs["machine_name"]:
254  run_machine_name = kwargs["machine_name"]
255  else:
256  if kwargs["conn_machine_name"] == "localhost":
257  run_machine_name = local_uname.nodename.split(".")[0]
258  else:
259  run_machine_name = kwargs["conn_machine_name"]
260  # If --machine-uname passed in, override pymapd con value
261  if kwargs["machine_uname"]:
262  run_machine_uname = kwargs["machine_uname"]
263  else:
264  if kwargs["conn_machine_name"] == "localhost":
265  run_machine_uname = " ".join(local_uname)
266  else:
267  run_machine_uname = ""
268  machine_info = {
269  "run_machine_name": run_machine_name,
270  "run_machine_uname": run_machine_uname,
271  }
272  return machine_info
273 
274 
275 def read_query_files(**kwargs):
276  """
277  Gets run machine GPU info
278 
279  Kwargs:
280  queries_dir(str): Directory with query files
281  source_table(str): Table to run query against
282 
283  Returns:
284  query_list(dict):::
285  query_group(str): Query group, usually matches table name
286  queries(list)
287  query(dict):::
288  name(str): Name of query
289  mapdql(str): Query syntax to run
290  False(bool): Unable to find queries dir
291  """
292  # Read query files contents and write to query_list
293  query_list = {"query_group": "", "queries": []}
294  query_group = kwargs["queries_dir"].split("/")[-1]
295  query_list.update(query_group=query_group)
296  logging.debug("Queries dir: " + kwargs["queries_dir"])
297  try:
298  for query_filename in sorted(os.listdir(kwargs["queries_dir"])):
299  logging.debug("Validating query filename: " + query_filename)
300  if validate_query_file(query_filename=query_filename):
301  with open(
302  kwargs["queries_dir"] + "/" + query_filename, "r"
303  ) as query_filepath:
304  logging.debug(
305  "Reading query with filename: " + query_filename
306  )
307  query_mapdql = query_filepath.read().replace("\n", " ")
308  query_mapdql = query_mapdql.replace(
309  "##TAB##", kwargs["source_table"]
310  )
311  query_list["queries"].append(
312  {"name": query_filename, "mapdql": query_mapdql}
313  )
314  logging.info("Read all query files")
315  return query_list
316  except FileNotFoundError:
317  logging.exception("Could not find queries directory.")
318  return False
319 
320 
321 def validate_query_file(**kwargs):
322  """
323  Validates query file. Currently only checks the query file name
324 
325  Kwargs:
326  query_filename(str): Name of query file
327 
328  Returns:
329  True(bool): Query succesfully validated
330  False(bool): Query failed validation
331  """
332  if not kwargs["query_filename"].endswith(".sql"):
333  logging.warning(
334  "Query filename "
335  + kwargs["query_filename"]
336  + ' is invalid - does not end in ".sql". Skipping'
337  )
338  return False
339  else:
340  return True
341 
342 
343 def execute_query(**kwargs):
344  """
345  Executes a query against the connected db using pymapd
346  https://pymapd.readthedocs.io/en/latest/usage.html#querying
347 
348  Kwargs:
349  query_name(str): Name of query
350  query_mapdql(str): Query to run
351  iteration(int): Iteration number
352  con(class): Connection class
353 
354  Returns:
355  query_execution(dict):::
356  result_count(int): Number of results returned
357  execution_time(float): Time (in ms) that pymapd reports
358  backend spent on query.
359  connect_time(float): Time (in ms) for overhead of query, calculated
360  by subtracting backend execution time
361  from time spent on the execution function.
362  results_iter_time(float): Time (in ms) it took to for
363  pymapd.fetchone() to iterate through all
364  of the results.
365  total_time(float): Time (in ms) from adding all above times.
366  False(bool): The query failed. Exception should be logged.
367  """
368  start_time = timeit.default_timer()
369  try:
370  # Run the query
371  query_result = kwargs["con"].execute(kwargs["query_mapdql"])
372  logging.debug(
373  "Completed iteration "
374  + str(kwargs["iteration"])
375  + " of query "
376  + kwargs["query_name"]
377  )
378  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
379  logging.exception(
380  "Error running query "
381  + kwargs["query_name"]
382  + " during iteration "
383  + str(kwargs["iteration"])
384  )
385  return False
386 
387  # Calculate times
388  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
389  execution_time = query_result._result.execution_time_ms
390  connect_time = round((query_elapsed_time - execution_time), 1)
391  # Iterate through each result from the query
392  logging.debug(
393  "Counting results from query"
394  + kwargs["query_name"]
395  + " iteration "
396  + str(kwargs["iteration"])
397  )
398  result_count = 0
399  start_time = timeit.default_timer()
400  while query_result.fetchone():
401  result_count += 1
402  results_iter_time = round(
403  ((timeit.default_timer() - start_time) * 1000), 1
404  )
405  query_execution = {
406  "result_count": result_count,
407  "execution_time": execution_time,
408  "connect_time": connect_time,
409  "results_iter_time": results_iter_time,
410  "total_time": execution_time + connect_time + results_iter_time,
411  }
412  logging.debug(
413  "Execution results for query"
414  + kwargs["query_name"]
415  + " iteration "
416  + str(kwargs["iteration"])
417  + ": "
418  + str(query_execution)
419  )
420  return query_execution
421 
422 
423 def calculate_query_times(**kwargs):
424  """
425  Calculates aggregate query times from all iteration times
426 
427  Kwargs:
428  total_times(list): List of total time calculations
429  execution_times(list): List of execution_time calculations
430  results_iter_times(list): List of results_iter_time calculations
431  connect_times(list): List of connect_time calculations
432  trim(float): Amount to trim from iterations set to gather trimmed
433  values. Enter as deciman corresponding to percent to
434  trim - ex: 0.15 to trim 15%.
435 
436  Returns:
437  query_execution(dict): Query times
438  False(bool): The query failed. Exception should be logged.
439  """
440  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
441  return {
442  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
443  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
444  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
445  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
446  "total_time_trimmed_avg": round(
447  numpy.mean(
448  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
449  ),
450  1,
451  )
452  if trim_size
453  else round(numpy.mean(kwargs["total_times"]), 1),
454  "total_times": kwargs["total_times"],
455  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
456  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
457  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
458  "execution_time_85": round(
459  numpy.percentile(kwargs["execution_times"], 85), 1
460  ),
461  "execution_time_25": round(
462  numpy.percentile(kwargs["execution_times"], 25), 1
463  ),
464  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
465  "execution_time_trimmed_avg": round(
466  numpy.mean(
467  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
468  )
469  )
470  if trim_size > 0
471  else round(numpy.mean(kwargs["execution_times"]), 1),
472  "execution_time_trimmed_max": round(
473  numpy.max(
474  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
475  )
476  )
477  if trim_size > 0
478  else round(numpy.max(kwargs["execution_times"]), 1),
479  "execution_times": kwargs["execution_times"],
480  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
481  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
482  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
483  "connect_time_85": round(
484  numpy.percentile(kwargs["connect_times"], 85), 1
485  ),
486  "results_iter_time_avg": round(
487  numpy.mean(kwargs["results_iter_times"]), 1
488  ),
489  "results_iter_time_min": round(
490  numpy.min(kwargs["results_iter_times"]), 1
491  ),
492  "results_iter_time_max": round(
493  numpy.max(kwargs["results_iter_times"]), 1
494  ),
495  "results_iter_time_85": round(
496  numpy.percentile(kwargs["results_iter_times"], 85), 1
497  ),
498  }
499 
500 
501 def get_mem_usage(**kwargs):
502  """
503  Calculates memory statistics from mapd_server _client.get_memory call
504 
505  Kwargs:
506  con(class 'pymapd.connection.Connection'): Mapd connection
507  mem_type(str): [gpu, cpu] Type of memory to gather metrics for
508 
509  Returns:
510  ramusage(dict):::
511  usedram(float): Amount of memory (in MB) used
512  freeram(float): Amount of memory (in MB) free
513  totalallocated(float): Total amount of memory (in MB) allocated
514  errormessage(str): Error if returned by get_memory call
515  rawdata(list): Raw data returned from get_memory call
516  """
517  try:
518  con_mem_data_list = kwargs["con"]._client.get_memory(
519  session=kwargs["con"]._session, memory_level=kwargs["mem_type"]
520  )
521  usedram = 0
522  freeram = 0
523  for con_mem_data in con_mem_data_list:
524  page_size = con_mem_data.page_size
525  node_memory_data_list = con_mem_data.node_memory_data
526  for node_memory_data in node_memory_data_list:
527  ram = node_memory_data.num_pages * page_size
528  is_free = node_memory_data.is_free
529  if is_free:
530  freeram += ram
531  else:
532  usedram += ram
533  totalallocated = usedram + freeram
534  if totalallocated > 0:
535  totalallocated = round(totalallocated / 1024 / 1024, 1)
536  usedram = round(usedram / 1024 / 1024, 1)
537  freeram = round(freeram / 1024 / 1024, 1)
538  ramusage = {}
539  ramusage["usedram"] = usedram
540  ramusage["freeram"] = freeram
541  ramusage["totalallocated"] = totalallocated
542  ramusage["errormessage"] = ""
543  except Exception as e:
544  errormessage = "Get memory failed with error: " + str(e)
545  logging.error(errormessage)
546  ramusage["errormessage"] = errormessage
547  return ramusage
548 
549 
550 def run_query(**kwargs):
551  """
552  Takes query name, syntax, and iteration count and calls the
553  execute_query function for each iteration. Reports total, iteration,
554  and exec timings, memory usage, and failure status.
555 
556  Kwargs:
557  query(dict):::
558  name(str): Name of query
559  mapdql(str): Query syntax to run
560  iterations(int): Number of iterations of each query to run
561  trim(float): Trim decimal to remove from top and bottom of results
562  con(class 'pymapd.connection.Connection'): Mapd connection
563 
564  Returns:
565  query_results(dict):::
566  query_name(str): Name of query
567  query_mapdql(str): Query to run
568  query_id(str): Query ID
569  query_succeeded(bool): Query succeeded
570  query_error_info(str): Query error info
571  result_count(int): Number of results returned
572  initial_iteration_results(dict):::
573  first_execution_time(float): Execution time for first query
574  iteration
575  first_connect_time(float): Connect time for first query
576  iteration
577  first_results_iter_time(float): Results iteration time for
578  first query iteration
579  first_total_time(float): Total time for first iteration
580  first_cpu_mem_usage(float): CPU memory usage for first query
581  iteration
582  first_gpu_mem_usage(float): GPU memory usage for first query
583  iteration
584  noninitial_iteration_results(list):::
585  execution_time(float): Time (in ms) that pymapd reports
586  backend spent on query.
587  connect_time(float): Time (in ms) for overhead of query,
588  calculated by subtracting backend execution time from
589  time spent on the execution function.
590  results_iter_time(float): Time (in ms) it took to for
591  pymapd.fetchone() to iterate through all of the results.
592  total_time(float): Time (in ms) from adding all above times.
593  query_total_elapsed_time(int): Total elapsed time for query
594  False(bool): The query failed. Exception should be logged.
595  """
596  logging.info(
597  "Running query: "
598  + kwargs["query"]["name"]
599  + " iterations: "
600  + str(kwargs["iterations"])
601  )
602  query_id = kwargs["query"]["name"].rsplit(".")[
603  0
604  ] # Query ID = filename without extention
605  query_results = {
606  "query_name": kwargs["query"]["name"],
607  "query_mapdql": kwargs["query"]["mapdql"],
608  "query_id": query_id,
609  "query_succeeded": True,
610  "query_error_info": "",
611  "initial_iteration_results": {},
612  "noninitial_iteration_results": [],
613  "query_total_elapsed_time": 0,
614  }
615  query_total_start_time = timeit.default_timer()
616  # Run iterations of query
617  for iteration in range(kwargs["iterations"]):
618  # Gather memory before running query iteration
619  logging.debug("Getting pre-query memory usage on CPU")
620  pre_query_cpu_mem_usage = get_mem_usage(
621  con=kwargs["con"], mem_type="cpu"
622  )
623  logging.debug("Getting pre-query memory usage on GPU")
624  pre_query_gpu_mem_usage = get_mem_usage(
625  con=kwargs["con"], mem_type="gpu"
626  )
627  # Run query iteration
628  logging.debug(
629  "Running iteration "
630  + str(iteration)
631  + " of query "
632  + kwargs["query"]["name"]
633  )
634  query_result = execute_query(
635  query_name=kwargs["query"]["name"],
636  query_mapdql=kwargs["query"]["mapdql"],
637  iteration=iteration,
638  con=kwargs["con"],
639  )
640  # Gather memory after running query iteration
641  logging.debug("Getting post-query memory usage on CPU")
642  post_query_cpu_mem_usage = get_mem_usage(
643  con=kwargs["con"], mem_type="cpu"
644  )
645  logging.debug("Getting post-query memory usage on GPU")
646  post_query_gpu_mem_usage = get_mem_usage(
647  con=kwargs["con"], mem_type="gpu"
648  )
649  # Calculate total (post minus pre) memory usage after query iteration
650  query_cpu_mem_usage = round(
651  post_query_cpu_mem_usage["usedram"]
652  - pre_query_cpu_mem_usage["usedram"],
653  1,
654  )
655  query_gpu_mem_usage = round(
656  post_query_gpu_mem_usage["usedram"]
657  - pre_query_gpu_mem_usage["usedram"],
658  1,
659  )
660  if query_result:
661  query_results.update(
662  query_error_info="" # TODO - interpret query error info
663  )
664  # Assign first query iteration times
665  if iteration == 0:
666  first_execution_time = round(query_result["execution_time"], 1)
667  first_connect_time = round(query_result["connect_time"], 1)
668  first_results_iter_time = round(
669  query_result["results_iter_time"], 1
670  )
671  first_total_time = (
672  first_execution_time
673  + first_connect_time
674  + first_results_iter_time
675  )
676  query_results.update(
677  initial_iteration_results={
678  "first_execution_time": first_execution_time,
679  "first_connect_time": first_connect_time,
680  "first_results_iter_time": first_results_iter_time,
681  "first_total_time": first_total_time,
682  "first_cpu_mem_usage": query_cpu_mem_usage,
683  "first_gpu_mem_usage": query_gpu_mem_usage,
684  }
685  )
686  else:
687  # Put noninitial iterations into query_result list
688  query_results["noninitial_iteration_results"].append(
689  query_result
690  )
691  # Verify no change in memory for noninitial iterations
692  if query_cpu_mem_usage != 0.0:
693  logging.error(
694  (
695  "Noninitial iteration ({0}) of query ({1}) "
696  + "shows non-zero CPU memory usage: {2}"
697  ).format(
698  iteration,
699  kwargs["query"]["name"],
700  query_cpu_mem_usage,
701  )
702  )
703  if query_gpu_mem_usage != 0.0:
704  logging.error(
705  (
706  "Noninitial iteration ({0}) of query ({1}) "
707  + "shows non-zero GPU memory usage: {2}"
708  ).format(
709  iteration,
710  kwargs["query"]["name"],
711  query_gpu_mem_usage,
712  )
713  )
714  else:
715  logging.warning(
716  "Error detected during execution of query: "
717  + kwargs["query"]["name"]
718  + ". This query will be skipped and "
719  + "times will not reported"
720  )
721  query_results.update(query_succeeded=False)
722  break
723  # Calculate time for all iterations to run
724  query_total_elapsed_time = round(
725  ((timeit.default_timer() - query_total_start_time) * 1000), 1
726  )
727  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
728  logging.info(
729  "Completed all iterations of query " + kwargs["query"]["name"]
730  )
731  return query_results
732 
733 
735  # Function to allow json to deal with datetime and numpy int
736  if isinstance(x, datetime.datetime):
737  return x.isoformat()
738  if isinstance(x, numpy.int64):
739  return int(x)
740  raise TypeError("Unknown type")
741 
742 
744  """
745  Create results dataset
746 
747  Kwargs:
748  run_guid(str): Run GUID
749  run_timestamp(datetime): Run timestamp
750  run_connection(str): Connection string
751  run_machine_name(str): Run machine name
752  run_machine_uname(str): Run machine uname
753  run_driver(str): Run driver
754  run_version(str): Version of DB
755  run_version_short(str): Shortened version of DB
756  label(str): Run label
757  source_db_gpu_count(int): Number of GPUs on run machine
758  source_db_gpu_driver_ver(str): GPU driver version
759  source_db_gpu_name(str): GPU name
760  source_db_gpu_mem(str): Amount of GPU mem on run machine
761  source_table(str): Table to run query against
762  trim(float): Trim decimal to remove from top and bottom of results
763  iterations(int): Number of iterations of each query to run
764  query_group(str): Query group, usually matches table name
765  query_results(dict):::
766  query_name(str): Name of query
767  query_mapdql(str): Query to run
768  query_id(str): Query ID
769  query_succeeded(bool): Query succeeded
770  query_error_info(str): Query error info
771  result_count(int): Number of results returned
772  initial_iteration_results(dict):::
773  first_execution_time(float): Execution time for first query
774  iteration
775  first_connect_time(float): Connect time for first query
776  iteration
777  first_results_iter_time(float): Results iteration time for
778  first query iteration
779  first_total_time(float): Total time for first iteration
780  first_cpu_mem_usage(float): CPU memory usage for first query
781  iteration
782  first_gpu_mem_usage(float): GPU memory usage for first query
783  iteration
784  noninitial_iteration_results(list):::
785  execution_time(float): Time (in ms) that pymapd reports
786  backend spent on query.
787  connect_time(float): Time (in ms) for overhead of query,
788  calculated by subtracting backend execution time from
789  time spent on the execution function.
790  results_iter_time(float): Time (in ms) it took to for
791  pymapd.fetchone() to iterate through all of the results.
792  total_time(float): Time (in ms) from adding all above times.
793  query_total_elapsed_time(int): Total elapsed time for query
794 
795  Returns:
796  results_dataset(list):::
797  result_dataset(dict): Query results dataset
798  """
799  results_dataset = []
800  for query_results in kwargs["queries_results"]:
801  if query_results["query_succeeded"]:
802  # Aggregate iteration values
803  execution_times, connect_times, results_iter_times, total_times = (
804  [],
805  [],
806  [],
807  [],
808  )
809  for noninitial_result in query_results[
810  "noninitial_iteration_results"
811  ]:
812  execution_times.append(noninitial_result["execution_time"])
813  connect_times.append(noninitial_result["connect_time"])
814  results_iter_times.append(
815  noninitial_result["results_iter_time"]
816  )
817  total_times.append(noninitial_result["total_time"])
818  # Overwrite result count, same for each iteration
819  result_count = noninitial_result["result_count"]
820  # Calculate query times
821  logging.debug(
822  "Calculating times from query " + query_results["query_id"]
823  )
824  query_times = calculate_query_times(
825  total_times=total_times,
826  execution_times=execution_times,
827  connect_times=connect_times,
828  results_iter_times=results_iter_times,
829  trim=kwargs[
830  "trim"
831  ], # Trim top and bottom n% for trimmed calculations
832  )
833  result_dataset = {
834  "name": query_results["query_name"],
835  "mapdql": query_results["query_mapdql"],
836  "succeeded": True,
837  "results": {
838  "run_guid": kwargs["run_guid"],
839  "run_timestamp": kwargs["run_timestamp"],
840  "run_connection": kwargs["run_connection"],
841  "run_machine_name": kwargs["run_machine_name"],
842  "run_machine_uname": kwargs["run_machine_uname"],
843  "run_driver": kwargs["run_driver"],
844  "run_version": kwargs["run_version"],
845  "run_version_short": kwargs["run_version_short"],
846  "run_label": kwargs["label"],
847  "run_gpu_count": kwargs["source_db_gpu_count"],
848  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
849  "run_gpu_name": kwargs["source_db_gpu_name"],
850  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
851  "run_table": kwargs["source_table"],
852  "query_group": kwargs["query_group"],
853  "query_id": query_results["query_id"],
854  "query_result_set_count": result_count,
855  "query_error_info": query_results["query_error_info"],
856  "query_conn_first": query_results[
857  "initial_iteration_results"
858  ]["first_connect_time"],
859  "query_conn_avg": query_times["connect_time_avg"],
860  "query_conn_min": query_times["connect_time_min"],
861  "query_conn_max": query_times["connect_time_max"],
862  "query_conn_85": query_times["connect_time_85"],
863  "query_exec_first": query_results[
864  "initial_iteration_results"
865  ]["first_execution_time"],
866  "query_exec_avg": query_times["execution_time_avg"],
867  "query_exec_min": query_times["execution_time_min"],
868  "query_exec_max": query_times["execution_time_max"],
869  "query_exec_85": query_times["execution_time_85"],
870  "query_exec_25": query_times["execution_time_25"],
871  "query_exec_stdd": query_times["execution_time_std"],
872  "query_exec_trimmed_avg": query_times[
873  "execution_time_trimmed_avg"
874  ],
875  "query_exec_trimmed_max": query_times[
876  "execution_time_trimmed_max"
877  ],
878  # Render queries not supported yet
879  "query_render_first": None,
880  "query_render_avg": None,
881  "query_render_min": None,
882  "query_render_max": None,
883  "query_render_85": None,
884  "query_render_25": None,
885  "query_render_stdd": None,
886  "query_total_first": query_results[
887  "initial_iteration_results"
888  ]["first_total_time"],
889  "query_total_avg": query_times["total_time_avg"],
890  "query_total_min": query_times["total_time_min"],
891  "query_total_max": query_times["total_time_max"],
892  "query_total_85": query_times["total_time_85"],
893  "query_total_all": query_results[
894  "query_total_elapsed_time"
895  ],
896  "query_total_trimmed_avg": query_times[
897  "total_time_trimmed_avg"
898  ],
899  "results_iter_count": kwargs["iterations"],
900  "results_iter_first": query_results[
901  "initial_iteration_results"
902  ]["first_results_iter_time"],
903  "results_iter_avg": query_times["results_iter_time_avg"],
904  "results_iter_min": query_times["results_iter_time_min"],
905  "results_iter_max": query_times["results_iter_time_max"],
906  "results_iter_85": query_times["results_iter_time_85"],
907  "cpu_mem_usage_mb": query_results[
908  "initial_iteration_results"
909  ]["first_cpu_mem_usage"],
910  "gpu_mem_usage_mb": query_results[
911  "initial_iteration_results"
912  ]["first_gpu_mem_usage"],
913  },
914  "debug": {
915  "query_exec_times": query_times["execution_times"],
916  "query_total_times": query_times["total_times"],
917  },
918  }
919  elif not query_results["query_succeeded"]:
920  result_dataset = {
921  "name": query_results["query_name"],
922  "mapdql": query_results["query_mapdql"],
923  "succeeded": False,
924  }
925  results_dataset.append(result_dataset)
926  logging.debug("All values set for query " + query_results["query_id"])
927  return results_dataset
928 
929 
930 def send_results_db(**kwargs):
931  """
932  Send results dataset to a database using pymapd
933 
934  Kwargs:
935  results_dataset(list):::
936  result_dataset(dict): Query results dataset
937  table(str): Results destination table name
938  db_user(str): Results destination user name
939  db_passwd(str): Results destination password
940  db_server(str): Results destination server address
941  db_port(int): Results destination server port
942  db_name(str): Results destination database name
943  table_schema_file(str): Path to destination database schema file
944 
945  Returns:
946  True(bool): Sending results to destination database succeeded
947  False(bool): Sending results to destination database failed. Exception
948  should be logged.
949  """
950  # Create dataframe from list of query results
951  logging.debug("Converting results list to pandas dataframe")
952  results_df = DataFrame(kwargs["results_dataset"])
953  # Establish connection to destination db
954  logging.debug("Connecting to destination db")
955  dest_con = get_connection(
956  db_user=kwargs["db_user"],
957  db_passwd=kwargs["db_passwd"],
958  db_server=kwargs["db_server"],
959  db_port=kwargs["db_port"],
960  db_name=kwargs["db_name"],
961  )
962  if not dest_con:
963  logging.exception("Could not connect to destination db.")
964  return False
965  # Load results into db, creating table if it does not exist
966  tables = dest_con.get_tables()
967  if kwargs["table"] not in tables:
968  logging.info("Destination table does not exist. Creating.")
969  try:
970  with open(kwargs["table_schema_file"], "r") as table_schema:
971  logging.debug(
972  "Reading table_schema_file: " + kwargs["table_schema_file"]
973  )
974  create_table_sql = table_schema.read().replace("\n", " ")
975  create_table_sql = create_table_sql.replace(
976  "##TAB##", kwargs["table"]
977  )
978  except FileNotFoundError:
979  logging.exception("Could not find destination table_schema_file.")
980  return False
981  try:
982  logging.debug("Executing create destination table query")
983  dest_con.execute(create_table_sql)
984  logging.debug("Destination table created.")
985  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
986  logging.exception("Error running destination table creation")
987  return False
988  logging.info("Loading results into destination db")
989  try:
990  dest_con.load_table_columnar(
991  kwargs["table"],
992  results_df,
993  preserve_index=False,
994  chunk_size_bytes=0,
995  col_names_from_schema=True,
996  )
997  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
998  logging.exception("Error loading results into destination db")
999  dest_con.close()
1000  return False
1001  dest_con.close()
1002  return True
1003 
1004 
1006  """
1007  Send results dataset to a local json file
1008 
1009  Kwargs:
1010  results_dataset_json(str): Json-formatted query results dataset
1011  output_file_json (str): Location of .json file output
1012 
1013  Returns:
1014  True(bool): Sending results to json file succeeded
1015  False(bool): Sending results to json file failed. Exception
1016  should be logged.
1017  """
1018  try:
1019  logging.debug("Opening json output file for writing")
1020  with open(kwargs["output_file_json"], "w") as file_json_open:
1021  logging.info(
1022  "Writing to output json file: " + kwargs["output_file_json"]
1023  )
1024  file_json_open.write(kwargs["results_dataset_json"])
1025  return True
1026  except IOError:
1027  logging.exception("Error writing results to json output file")
1028  return False
1029 
1030 
1032  """
1033  Send results dataset to a local json file formatted for use with jenkins
1034  benchmark plugin: https://github.com/jenkinsci/benchmark-plugin
1035 
1036  Kwargs:
1037  results_dataset(list):::
1038  result_dataset(dict): Query results dataset
1039  thresholds_name(str): Name to use for Jenkins result field
1040  thresholds_field(str): Field to use for query threshold in jenkins
1041  output_tag_jenkins(str): Jenkins benchmark result tag, for different
1042  sets from same table
1043  output_file_jenkins (str): Location of .json jenkins file output
1044 
1045  Returns:
1046  True(bool): Sending results to json file succeeded
1047  False(bool): Sending results to json file failed. Exception
1048  should be logged.
1049  """
1050  jenkins_bench_results = []
1051  for result_dataset in kwargs["results_dataset"]:
1052  logging.debug("Constructing output for jenkins benchmark plugin")
1053  jenkins_bench_results.append(
1054  {
1055  "name": result_dataset["query_id"],
1056  "description": "",
1057  "parameters": [],
1058  "results": [
1059  {
1060  "name": result_dataset["query_id"]
1061  + "_"
1062  + kwargs["thresholds_name"],
1063  "description": "",
1064  "unit": "ms",
1065  "dblValue": result_dataset[kwargs["thresholds_field"]],
1066  }
1067  ],
1068  }
1069  )
1070  jenkins_bench_json = json.dumps(
1071  {
1072  "groups": [
1073  {
1074  "name": result_dataset["run_table"]
1075  + kwargs["output_tag_jenkins"],
1076  "description": "Source table: "
1077  + result_dataset["run_table"],
1078  "tests": jenkins_bench_results,
1079  }
1080  ]
1081  }
1082  )
1083  try:
1084  logging.debug("Opening jenkins_bench json output file for writing")
1085  with open(kwargs["output_file_jenkins"], "w") as file_jenkins_open:
1086  logging.info(
1087  "Writing to jenkins_bench json file: "
1088  + kwargs["output_file_jenkins"]
1089  )
1090  file_jenkins_open.write(jenkins_bench_json)
1091  return True
1092  except IOError:
1093  logging.exception("Error writing results to jenkins json output file")
1094  return False
1095 
1096 
1097 def send_results_output(**kwargs):
1098  """
1099  Send results dataset script output
1100 
1101  Kwargs:
1102  results_dataset_json(str): Json-formatted query results dataset
1103 
1104  Returns:
1105  True(bool): Sending results to output succeeded
1106  """
1107  logging.info("Printing query results to output")
1108  print(kwargs["results_dataset_json"])
1109  return True
1110 
1111 
1112 def process_arguments(input_arguments):
1113  # Parse input parameters
1114  parser = ArgumentParser()
1115  optional = parser._action_groups.pop()
1116  required = parser.add_argument_group("required arguments")
1117  parser._action_groups.append(optional)
1118  optional.add_argument(
1119  "-v", "--verbose", action="store_true", help="Turn on debug logging"
1120  )
1121  optional.add_argument(
1122  "-q",
1123  "--quiet",
1124  action="store_true",
1125  help="Suppress script outuput " + "(except warnings and errors)",
1126  )
1127  required.add_argument(
1128  "-u",
1129  "--user",
1130  dest="user",
1131  default="mapd",
1132  help="Source database user",
1133  )
1134  required.add_argument(
1135  "-p",
1136  "--passwd",
1137  dest="passwd",
1138  default="HyperInteractive",
1139  help="Source database password",
1140  )
1141  required.add_argument(
1142  "-s",
1143  "--server",
1144  dest="server",
1145  default="localhost",
1146  help="Source database server hostname",
1147  )
1148  optional.add_argument(
1149  "-o",
1150  "--port",
1151  dest="port",
1152  type=int,
1153  default=6274,
1154  help="Source database server port",
1155  )
1156  required.add_argument(
1157  "-n",
1158  "--name",
1159  dest="name",
1160  default="mapd",
1161  help="Source database name",
1162  )
1163  required.add_argument(
1164  "-t",
1165  "--table",
1166  dest="table",
1167  required=True,
1168  help="Source db table name",
1169  )
1170  required.add_argument(
1171  "-l",
1172  "--label",
1173  dest="label",
1174  required=True,
1175  help="Benchmark run label",
1176  )
1177  required.add_argument(
1178  "-d",
1179  "--queries-dir",
1180  dest="queries_dir",
1181  help='Absolute path to dir with query files. \
1182  [Default: "queries" dir in same location as script]',
1183  )
1184  required.add_argument(
1185  "-i",
1186  "--iterations",
1187  dest="iterations",
1188  type=int,
1189  required=True,
1190  help="Number of iterations per query. Must be > 1",
1191  )
1192  optional.add_argument(
1193  "-g",
1194  "--gpu-count",
1195  dest="gpu_count",
1196  type=int,
1197  default=None,
1198  help="Number of GPUs. Not required when gathering local gpu info",
1199  )
1200  optional.add_argument(
1201  "-G",
1202  "--gpu-name",
1203  dest="gpu_name",
1204  type=str,
1205  default="",
1206  help="Name of GPU(s). Not required when gathering local gpu info",
1207  )
1208  optional.add_argument(
1209  "--no-gather-conn-gpu-info",
1210  dest="no_gather_conn_gpu_info",
1211  action="store_true",
1212  help="Do not gather source database GPU info fields "
1213  + "[run_gpu_count, run_gpu_mem_mb] "
1214  + "using pymapd connection info. "
1215  + "Use when testing a CPU-only server.",
1216  )
1217  optional.add_argument(
1218  "--no-gather-nvml-gpu-info",
1219  dest="no_gather_nvml_gpu_info",
1220  action="store_true",
1221  help="Do not gather source database GPU info fields "
1222  + "[gpu_driver_ver, run_gpu_name] "
1223  + "from local GPU using pynvml. "
1224  + 'Defaults to True when source server is not "localhost". '
1225  + "Use when testing a CPU-only server.",
1226  )
1227  optional.add_argument(
1228  "--gather-nvml-gpu-info",
1229  dest="gather_nvml_gpu_info",
1230  action="store_true",
1231  help="Gather source database GPU info fields "
1232  + "[gpu_driver_ver, run_gpu_name] "
1233  + "from local GPU using pynvml. "
1234  + 'Defaults to True when source server is "localhost". '
1235  + "Only use when benchmarking against same machine that this script "
1236  + "is run from.",
1237  )
1238  optional.add_argument(
1239  "-m",
1240  "--machine-name",
1241  dest="machine_name",
1242  help="Name of source machine",
1243  )
1244  optional.add_argument(
1245  "-a",
1246  "--machine-uname",
1247  dest="machine_uname",
1248  help="Uname info from " + "source machine",
1249  )
1250  optional.add_argument(
1251  "-e",
1252  "--destination",
1253  dest="destination",
1254  default="mapd_db",
1255  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
1256  + "Multiple values can be input seperated by commas, "
1257  + 'ex: "mapd_db,file_json"',
1258  )
1259  optional.add_argument(
1260  "-U",
1261  "--dest-user",
1262  dest="dest_user",
1263  default="mapd",
1264  help="Destination mapd_db database user",
1265  )
1266  optional.add_argument(
1267  "-P",
1268  "--dest-passwd",
1269  dest="dest_passwd",
1270  default="HyperInteractive",
1271  help="Destination mapd_db database password",
1272  )
1273  optional.add_argument(
1274  "-S",
1275  "--dest-server",
1276  dest="dest_server",
1277  help="Destination mapd_db database server hostname"
1278  + ' (required if destination = "mapd_db")',
1279  )
1280  optional.add_argument(
1281  "-O",
1282  "--dest-port",
1283  dest="dest_port",
1284  type=int,
1285  default=6274,
1286  help="Destination mapd_db database server port",
1287  )
1288  optional.add_argument(
1289  "-N",
1290  "--dest-name",
1291  dest="dest_name",
1292  default="mapd",
1293  help="Destination mapd_db database name",
1294  )
1295  optional.add_argument(
1296  "-T",
1297  "--dest-table",
1298  dest="dest_table",
1299  default="results",
1300  help="Destination mapd_db table name",
1301  )
1302  optional.add_argument(
1303  "-C",
1304  "--dest-table-schema-file",
1305  dest="dest_table_schema_file",
1306  default="results_table_schemas/query-results.sql",
1307  help="Destination table schema file. This must be an executable "
1308  + "CREATE TABLE statement that matches the output of this script. It "
1309  + "is required when creating the results table. Default location is "
1310  + 'in "./results_table_schemas/query-results.sql"',
1311  )
1312  optional.add_argument(
1313  "-j",
1314  "--output-file-json",
1315  dest="output_file_json",
1316  help="Absolute path of .json output file "
1317  + '(required if destination = "file_json")',
1318  )
1319  optional.add_argument(
1320  "-J",
1321  "--output-file-jenkins",
1322  dest="output_file_jenkins",
1323  help="Absolute path of jenkins benchmark .json output file "
1324  + '(required if destination = "jenkins_bench")',
1325  )
1326  optional.add_argument(
1327  "-E",
1328  "--output-tag-jenkins",
1329  dest="output_tag_jenkins",
1330  default="",
1331  help="Jenkins benchmark result tag. "
1332  + 'Optional, appended to table name in "group" field',
1333  )
1334  args = parser.parse_args(args=input_arguments)
1335  return args
1336 
1337 
1338 def benchmark(input_arguments):
1339  # Set input args to vars
1340  args = process_arguments(input_arguments)
1341  verbose = args.verbose
1342  quiet = args.quiet
1343  source_db_user = args.user
1344  source_db_passwd = args.passwd
1345  source_db_server = args.server
1346  source_db_port = args.port
1347  source_db_name = args.name
1348  source_table = args.table
1349  label = args.label
1350  queries_dir = args.queries_dir
1351  iterations = args.iterations
1352  gpu_count = args.gpu_count
1353  gpu_name = args.gpu_name
1354  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
1355  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
1356  gather_nvml_gpu_info = args.gather_nvml_gpu_info
1357  machine_name = args.machine_name
1358  machine_uname = args.machine_uname
1359  destinations = args.destination
1360  dest_db_user = args.dest_user
1361  dest_db_passwd = args.dest_passwd
1362  dest_db_server = args.dest_server
1363  dest_db_port = args.dest_port
1364  dest_db_name = args.dest_name
1365  dest_table = args.dest_table
1366  dest_table_schema_file = args.dest_table_schema_file
1367  output_file_json = args.output_file_json
1368  output_file_jenkins = args.output_file_jenkins
1369  output_tag_jenkins = args.output_tag_jenkins
1370 
1371  # Hard-coded vars
1372  trim = 0.15
1373  jenkins_thresholds_name = "average"
1374  jenkins_thresholds_field = "query_exec_avg"
1375 
1376  # Set logging output level
1377  if verbose:
1378  logging.basicConfig(level=logging.DEBUG)
1379  elif quiet:
1380  logging.basicConfig(level=logging.WARNING)
1381  else:
1382  logging.basicConfig(level=logging.INFO)
1383 
1384  # Input validation
1385  if (iterations > 1) is not True:
1386  # Need > 1 iteration as first iteration is dropped from calculations
1387  logging.error("Iterations must be greater than 1")
1388  exit(1)
1390  destinations=destinations,
1391  dest_db_server=dest_db_server,
1392  output_file_json=output_file_json,
1393  output_file_jenkins=output_file_jenkins,
1394  ):
1395  logging.debug("Destination(s) have been verified.")
1396  else:
1397  logging.error("No valid destination(s) have been set. Exiting.")
1398  exit(1)
1399 
1400  # Establish connection to mapd db
1401  con = get_connection(
1402  db_user=source_db_user,
1403  db_passwd=source_db_passwd,
1404  db_server=source_db_server,
1405  db_port=source_db_port,
1406  db_name=source_db_name,
1407  )
1408  if not con:
1409  exit(1) # Exit if cannot connect to db
1410  # Set run-specific variables (time, uid, etc.)
1411  run_vars = get_run_vars(con=con)
1412  # Set GPU info depending on availability
1413  gpu_info = get_gpu_info(
1414  gpu_name=gpu_name,
1415  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
1416  con=con,
1417  conn_machine_name=run_vars["conn_machine_name"],
1418  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
1419  gather_nvml_gpu_info=gather_nvml_gpu_info,
1420  gpu_count=gpu_count,
1421  )
1422  # Set run machine info
1423  machine_info = get_machine_info(
1424  conn_machine_name=run_vars["conn_machine_name"],
1425  machine_name=machine_name,
1426  machine_uname=machine_uname,
1427  )
1428  # Read queries from files, set to queries dir in PWD if not passed in
1429  if not queries_dir:
1430  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
1431  query_list = read_query_files(
1432  queries_dir=queries_dir, source_table=source_table
1433  )
1434  if not query_list:
1435  exit(1)
1436  # Run queries
1437  queries_results = []
1438  for query in query_list["queries"]:
1439  query_result = run_query(
1440  query=query, iterations=iterations, trim=trim, con=con
1441  )
1442  queries_results.append(query_result)
1443  logging.info("Completed all queries.")
1444  logging.debug("Closing source db connection.")
1445  con.close()
1446  # Generate results dataset
1447  results_dataset = create_results_dataset(
1448  run_guid=run_vars["run_guid"],
1449  run_timestamp=run_vars["run_timestamp"],
1450  run_connection=run_vars["run_connection"],
1451  run_machine_name=machine_info["run_machine_name"],
1452  run_machine_uname=machine_info["run_machine_uname"],
1453  run_driver=run_vars["run_driver"],
1454  run_version=run_vars["run_version"],
1455  run_version_short=run_vars["run_version_short"],
1456  label=label,
1457  source_db_gpu_count=gpu_info["source_db_gpu_count"],
1458  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
1459  source_db_gpu_name=gpu_info["source_db_gpu_name"],
1460  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
1461  source_table=source_table,
1462  trim=trim,
1463  iterations=iterations,
1464  query_group=query_list["query_group"],
1465  queries_results=queries_results,
1466  )
1467  results_dataset_json = json.dumps(
1468  results_dataset, default=json_format_handler, indent=2
1469  )
1470  successful_results_dataset = [
1471  x for x in results_dataset if x["succeeded"] is not False
1472  ]
1473  successful_results_dataset_results = []
1474  for results_dataset_entry in successful_results_dataset:
1475  successful_results_dataset_results.append(
1476  results_dataset_entry["results"]
1477  )
1478  # Send results to destination(s)
1479  sent_destination = True
1480  if "mapd_db" in destinations:
1481  if not send_results_db(
1482  results_dataset=successful_results_dataset_results,
1483  table=dest_table,
1484  db_user=dest_db_user,
1485  db_passwd=dest_db_passwd,
1486  db_server=dest_db_server,
1487  db_port=dest_db_port,
1488  db_name=dest_db_name,
1489  table_schema_file=dest_table_schema_file,
1490  ):
1491  sent_destination = False
1492  if "file_json" in destinations:
1493  if not send_results_file_json(
1494  results_dataset_json=results_dataset_json,
1495  output_file_json=output_file_json,
1496  ):
1497  sent_destination = False
1498  if "jenkins_bench" in destinations:
1500  results_dataset=successful_results_dataset_results,
1501  thresholds_name=jenkins_thresholds_name,
1502  thresholds_field=jenkins_thresholds_field,
1503  output_tag_jenkins=output_tag_jenkins,
1504  output_file_jenkins=output_file_jenkins,
1505  ):
1506  sent_destination = False
1507  if "output" in destinations:
1508  if not send_results_output(results_dataset_json=results_dataset_json):
1509  sent_destination = False
1510  if not sent_destination:
1511  logging.error("Sending results to one or more destinations failed")
1512  exit(1)
1513  else:
1514  logging.info(
1515  "Succesfully loaded query results info into destination(s)"
1516  )
1517 
1518 
1519 if __name__ == "__main__":
1520  benchmark(sys.argv[1:])
std::string join(T const &container, std::string const &delim)
def verify_destinations
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:136
def create_results_dataset
std::vector< std::string > split(const std::string &str, const std::string &delim)
split apart a string into a vector of substrings
def send_results_file_json
def calculate_query_times
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:83
def send_results_jenkins_bench