OmniSciDB  a47db9e897
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
run_benchmark_arrow.py
Go to the documentation of this file.
1 import os
2 import timeit
3 import logging
4 import uuid
5 import datetime
6 import json
7 import numpy
8 import pymapd
9 import re
10 import sys
11 from pandas import DataFrame
12 from run_benchmark import (
13  verify_destinations,
14  get_connection,
15  get_run_vars,
16  get_gpu_info,
17  get_machine_info,
18  read_query_files,
19  validate_query_file,
20  get_mem_usage,
21  json_format_handler,
22  send_results_db,
23  send_results_file_json,
24  send_results_jenkins_bench,
25  send_results_output,
26 )
27 from argparse import ArgumentParser
28 
29 
30 def execute_query(**kwargs):
31  """
32  Executes a query against the connected db using pymapd
33  https://pymapd.readthedocs.io/en/latest/usage.html#querying
34 
35  Kwargs:
36  query_name(str): Name of query
37  query_mapdql(str): Query to run
38  iteration(int): Iteration number
39  con(class): Connection class
40 
41  Returns:
42  query_execution(dict):::
43  result_count(int): Number of results returned
44  execution_time(float): Time (in ms) that pymapd reports
45  backend spent on query.
46  connect_time(float): Time (in ms) for overhead of query, calculated
47  by subtracting backend execution time
48  from time spent on the execution function.
49  arrow_conversion_time(float): Time (in ms) for converting and
50  serializing results in arrow format
51  total_time(float): Time (in ms) from adding all above times.
52  False(bool): The query failed. Exception should be logged.
53  """
54  start_time = timeit.default_timer()
55  query_result = {}
56  arrow_cpu_output = kwargs["arrow_cpu_output"]
57  try:
58  # Run the query
59  if arrow_cpu_output:
60  query_result = kwargs["con"].select_ipc(kwargs["query_mapdql"])
61  else:
62  query_result = kwargs["con"]._client.sql_execute_gdf(
63  kwargs["con"]._session,
64  kwargs["query_mapdql"],
65  device_id=0,
66  first_n=-1,
67  )
68  logging.debug(
69  "Completed iteration "
70  + str(kwargs["iteration"])
71  + " of query "
72  + kwargs["query_name"]
73  )
74  except (pymapd.exceptions.ProgrammingError, pymapd.exceptions.Error):
75  logging.exception(
76  "Error running query "
77  + kwargs["query_name"]
78  + " during iteration "
79  + str(kwargs["iteration"])
80  )
81  return False
82 
83  # Calculate times
84  query_elapsed_time = (timeit.default_timer() - start_time) * 1000
85  execution_time = 0
86  if arrow_cpu_output:
87  execution_time = query_result._tdf.execution_time_ms
88  else:
89  execution_time = query_result.execution_time_ms
90  connect_time = round((query_elapsed_time - execution_time), 1)
91  arrow_conversion_time = 0
92  if arrow_cpu_output:
93  arrow_conversion_time = query_result._tdf.arrow_conversion_time_ms
94  else:
95  arrow_conversion_time = query_result.arrow_conversion_time_ms
96  # Iterate through each result from the query
97  logging.debug(
98  "Counting results from query"
99  + kwargs["query_name"]
100  + " iteration "
101  + str(kwargs["iteration"])
102  )
103  result_count = 0
104  start_time = timeit.default_timer()
105  if arrow_cpu_output:
106  result_count = len(query_result.index)
107  # TODO(Wamsi): Add support for computing cuDF size, once cuDF is fixed.
108  query_execution = {
109  "result_count": result_count,
110  "execution_time": execution_time,
111  "connect_time": connect_time,
112  "arrow_conversion_time": arrow_conversion_time,
113  "total_time": execution_time
114  + connect_time
115  + arrow_conversion_time,
116  }
117  logging.debug(
118  "Execution results for query"
119  + kwargs["query_name"]
120  + " iteration "
121  + str(kwargs["iteration"])
122  + ": "
123  + str(query_execution)
124  )
125  return query_execution
126 
127 
128 def calculate_query_times(**kwargs):
129  """
130  Calculates aggregate query times from all iteration times
131 
132  Kwargs:
133  total_times(list): List of total time calculations
134  execution_times(list): List of execution_time calculations
135  connect_times(list): List of connect_time calculations
136  arrow_conversion_times(list): List of arrow_conversion_time calculations
137  trim(float): Amount to trim from iterations set to gather trimmed
138  values. Enter as deciman corresponding to percent to
139  trim - ex: 0.15 to trim 15%.
140 
141  Returns:
142  query_execution(dict): Query times
143  False(bool): The query failed. Exception should be logged.
144  """
145  trim_size = int(kwargs["trim"] * len(kwargs["total_times"]))
146  return {
147  "total_time_avg": round(numpy.mean(kwargs["total_times"]), 1),
148  "total_time_min": round(numpy.min(kwargs["total_times"]), 1),
149  "total_time_max": round(numpy.max(kwargs["total_times"]), 1),
150  "total_time_85": round(numpy.percentile(kwargs["total_times"], 85), 1),
151  "total_time_trimmed_avg": round(
152  numpy.mean(
153  numpy.sort(kwargs["total_times"])[trim_size:-trim_size]
154  ),
155  1,
156  )
157  if trim_size
158  else round(numpy.mean(kwargs["total_times"]), 1),
159  "total_times": kwargs["total_times"],
160  "execution_time_avg": round(numpy.mean(kwargs["execution_times"]), 1),
161  "execution_time_min": round(numpy.min(kwargs["execution_times"]), 1),
162  "execution_time_max": round(numpy.max(kwargs["execution_times"]), 1),
163  "execution_time_85": round(
164  numpy.percentile(kwargs["execution_times"], 85), 1
165  ),
166  "execution_time_25": round(
167  numpy.percentile(kwargs["execution_times"], 25), 1
168  ),
169  "execution_time_std": round(numpy.std(kwargs["execution_times"]), 1),
170  "execution_time_trimmed_avg": round(
171  numpy.mean(
172  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
173  )
174  )
175  if trim_size > 0
176  else round(numpy.mean(kwargs["execution_times"]), 1),
177  "execution_time_trimmed_max": round(
178  numpy.max(
179  numpy.sort(kwargs["execution_times"])[trim_size:-trim_size]
180  )
181  )
182  if trim_size > 0
183  else round(numpy.max(kwargs["execution_times"]), 1),
184  "execution_times": kwargs["execution_times"],
185  "connect_time_avg": round(numpy.mean(kwargs["connect_times"]), 1),
186  "connect_time_min": round(numpy.min(kwargs["connect_times"]), 1),
187  "connect_time_max": round(numpy.max(kwargs["connect_times"]), 1),
188  "connect_time_85": round(
189  numpy.percentile(kwargs["connect_times"], 85), 1
190  ),
191  "arrow_conversion_time_avg": round(
192  numpy.mean(kwargs["arrow_conversion_times"]), 1
193  ),
194  "arrow_conversion_time_min": round(
195  numpy.min(kwargs["arrow_conversion_times"]), 1
196  ),
197  "arrow_conversion_time_max": round(
198  numpy.max(kwargs["arrow_conversion_times"]), 1
199  ),
200  "arrow_conversion_time_85": round(
201  numpy.percentile(kwargs["arrow_conversion_times"], 85), 1
202  ),
203  "arrow_conversion_time_25": round(
204  numpy.percentile(kwargs["arrow_conversion_times"], 25), 1
205  ),
206  "arrow_conversion_time_std": round(
207  numpy.std(kwargs["arrow_conversion_times"]), 1
208  ),
209  }
210 
211 def run_query(**kwargs):
212  """
213  Takes query name, syntax, and iteration count and calls the
214  execute_query function for each iteration. Reports total, iteration,
215  and exec timings, memory usage, and failure status.
216 
217  Kwargs:
218  query(dict):::
219  name(str): Name of query
220  mapdql(str): Query syntax to run
221  iterations(int): Number of iterations of each query to run
222  trim(float): Trim decimal to remove from top and bottom of results
223  con(class 'pymapd.connection.Connection'): Mapd connection
224 
225  Returns:
226  query_results(dict):::
227  query_name(str): Name of query
228  query_mapdql(str): Query to run
229  query_id(str): Query ID
230  query_succeeded(bool): Query succeeded
231  query_error_info(str): Query error info
232  result_count(int): Number of results returned
233  initial_iteration_results(dict):::
234  first_execution_time(float): Execution time for first query
235  iteration
236  first_connect_time(float): Connect time for first query
237  iteration
238  first_results_iter_time(float): Results iteration time for
239  first query iteration
240  first_total_time(float): Total time for first iteration
241  first_cpu_mem_usage(float): CPU memory usage for first query
242  iteration
243  first_gpu_mem_usage(float): GPU memory usage for first query
244  iteration
245  noninitial_iteration_results(list):::
246  execution_time(float): Time (in ms) that pymapd reports
247  backend spent on query.
248  connect_time(float): Time (in ms) for overhead of query,
249  calculated by subtracting backend execution time from
250  time spent on the execution function.
251  results_iter_time(float): Time (in ms) it took to for
252  pymapd.fetchone() to iterate through all of the results.
253  total_time(float): Time (in ms) from adding all above times.
254  query_total_elapsed_time(int): Total elapsed time for query
255  False(bool): The query failed. Exception should be logged.
256  """
257  logging.info(
258  "Running query: "
259  + kwargs["query"]["name"]
260  + " iterations: "
261  + str(kwargs["iterations"])
262  )
263  query_id = kwargs["query"]["name"].rsplit(".")[
264  0
265  ] # Query ID = filename without extention
266  query_results = {
267  "query_name": kwargs["query"]["name"],
268  "query_mapdql": kwargs["query"]["mapdql"],
269  "query_id": query_id,
270  "query_succeeded": True,
271  "query_error_info": "",
272  "initial_iteration_results": {},
273  "noninitial_iteration_results": [],
274  "query_total_elapsed_time": 0,
275  }
276  query_total_start_time = timeit.default_timer()
277  # Run iterations of query
278  for iteration in range(kwargs["iterations"]):
279  # Gather memory before running query iteration
280  logging.debug("Getting pre-query memory usage on CPU")
281  pre_query_cpu_mem_usage = get_mem_usage(
282  con=kwargs["con"], mem_type="cpu"
283  )
284  logging.debug("Getting pre-query memory usage on GPU")
285  pre_query_gpu_mem_usage = get_mem_usage(
286  con=kwargs["con"], mem_type="gpu"
287  )
288  # Run query iteration
289  logging.debug(
290  "Running iteration "
291  + str(iteration)
292  + " of query "
293  + kwargs["query"]["name"]
294  )
295  query_result = execute_query(
296  query_name=kwargs["query"]["name"],
297  query_mapdql=kwargs["query"]["mapdql"],
298  iteration=iteration,
299  con=kwargs["con"],
300  arrow_cpu_output=kwargs["arrow_cpu_output"],
301 
302  )
303  # Gather memory after running query iteration
304  logging.debug("Getting post-query memory usage on CPU")
305  post_query_cpu_mem_usage = get_mem_usage(
306  con=kwargs["con"], mem_type="cpu"
307  )
308  logging.debug("Getting post-query memory usage on GPU")
309  post_query_gpu_mem_usage = get_mem_usage(
310  con=kwargs["con"], mem_type="gpu"
311  )
312  # Calculate total (post minus pre) memory usage after query iteration
313  query_cpu_mem_usage = round(
314  post_query_cpu_mem_usage["usedram"]
315  - pre_query_cpu_mem_usage["usedram"],
316  1,
317  )
318  query_gpu_mem_usage = round(
319  post_query_gpu_mem_usage["usedram"]
320  - pre_query_gpu_mem_usage["usedram"],
321  1,
322  )
323  if query_result:
324  query_results.update(
325  query_error_info="" # TODO - interpret query error info
326  )
327  # Assign first query iteration times
328  if iteration == 0:
329  first_execution_time = round(query_result["execution_time"], 1)
330  first_connect_time = round(query_result["connect_time"], 1)
331  first_total_time = (
332  first_execution_time
333  + first_connect_time
334  )
335  query_results.update(
336  initial_iteration_results={
337  "first_execution_time": first_execution_time,
338  "first_connect_time": first_connect_time,
339  "first_total_time": first_total_time,
340  "first_cpu_mem_usage": query_cpu_mem_usage,
341  "first_gpu_mem_usage": query_gpu_mem_usage,
342  }
343  )
344  else:
345  # Put noninitial iterations into query_result list
346  query_results["noninitial_iteration_results"].append(
347  query_result
348  )
349  # Verify no change in memory for noninitial iterations
350  if query_cpu_mem_usage != 0.0:
351  logging.error(
352  (
353  "Noninitial iteration ({0}) of query ({1}) "
354  + "shows non-zero CPU memory usage: {2}"
355  ).format(
356  iteration,
357  kwargs["query"]["name"],
358  query_cpu_mem_usage,
359  )
360  )
361  if query_gpu_mem_usage != 0.0:
362  logging.error(
363  (
364  "Noninitial iteration ({0}) of query ({1}) "
365  + "shows non-zero GPU memory usage: {2}"
366  ).format(
367  iteration,
368  kwargs["query"]["name"],
369  query_gpu_mem_usage,
370  )
371  )
372  else:
373  logging.warning(
374  "Error detected during execution of query: "
375  + kwargs["query"]["name"]
376  + ". This query will be skipped and "
377  + "times will not reported"
378  )
379  query_results.update(query_succeeded=False)
380  break
381  # Calculate time for all iterations to run
382  query_total_elapsed_time = round(
383  ((timeit.default_timer() - query_total_start_time) * 1000), 1
384  )
385  query_results.update(query_total_elapsed_time=query_total_elapsed_time)
386  logging.info(
387  "Completed all iterations of query " + kwargs["query"]["name"]
388  )
389  return query_results
390 
392  """
393  Create results dataset
394 
395  Kwargs:
396  run_guid(str): Run GUID
397  run_timestamp(datetime): Run timestamp
398  run_connection(str): Connection string
399  run_machine_name(str): Run machine name
400  run_machine_uname(str): Run machine uname
401  run_driver(str): Run driver
402  run_version(str): Version of DB
403  run_version_short(str): Shortened version of DB
404  label(str): Run label
405  source_db_gpu_count(int): Number of GPUs on run machine
406  source_db_gpu_driver_ver(str): GPU driver version
407  source_db_gpu_name(str): GPU name
408  source_db_gpu_mem(str): Amount of GPU mem on run machine
409  source_table(str): Table to run query against
410  trim(float): Trim decimal to remove from top and bottom of results
411  iterations(int): Number of iterations of each query to run
412  query_group(str): Query group, usually matches table name
413  query_results(dict):::
414  query_name(str): Name of query
415  query_mapdql(str): Query to run
416  query_id(str): Query ID
417  query_succeeded(bool): Query succeeded
418  query_error_info(str): Query error info
419  result_count(int): Number of results returned
420  initial_iteration_results(dict):::
421  first_execution_time(float): Execution time for first query
422  iteration
423  first_connect_time(float): Connect time for first query
424  iteration
425  first_total_time(float): Total time for first iteration
426  first_cpu_mem_usage(float): CPU memory usage for first query
427  iteration
428  first_gpu_mem_usage(float): GPU memory usage for first query
429  iteration
430  noninitial_iteration_results(list):::
431  execution_time(float): Time (in ms) that pymapd reports
432  backend spent on query.
433  connect_time(float): Time (in ms) for overhead of query,
434  calculated by subtracting backend execution time from
435  time spent on the execution function.
436  arrow_conversion_time(float): Time (in ms) it took for
437  arrow conversion and serialization fo results.
438  total_time(float): Time (in ms) from adding all above times.
439  query_total_elapsed_time(int): Total elapsed time for query
440 
441  Returns:
442  results_dataset(list):::
443  result_dataset(dict): Query results dataset
444  """
445  results_dataset = []
446  for query_results in kwargs["queries_results"]:
447  if query_results["query_succeeded"]:
448  # Aggregate iteration values
449  (
450  execution_times,
451  connect_times,
452  arrow_conversion_times,
453  total_times,
454  ) = (
455  [],
456  [],
457  [],
458  [],
459  )
460  for noninitial_result in query_results[
461  "noninitial_iteration_results"
462  ]:
463  execution_times.append(noninitial_result["execution_time"])
464  connect_times.append(noninitial_result["connect_time"])
465  arrow_conversion_times.append(noninitial_result["arrow_conversion_time"]
466  )
467  total_times.append(noninitial_result["total_time"])
468  # Overwrite result count, same for each iteration
469  result_count = noninitial_result["result_count"]
470  # Calculate query times
471  logging.debug(
472  "Calculating times from query " + query_results["query_id"]
473  )
474  query_times = calculate_query_times(
475  total_times=total_times,
476  execution_times=execution_times,
477  connect_times=connect_times,
478  arrow_conversion_times=arrow_conversion_times,
479  trim=kwargs[
480  "trim"
481  ], # Trim top and bottom n% for trimmed calculations
482  )
483  result_dataset = {
484  "name": query_results["query_name"],
485  "mapdql": query_results["query_mapdql"],
486  "succeeded": True,
487  "results": {
488  "run_guid": kwargs["run_guid"],
489  "run_timestamp": kwargs["run_timestamp"],
490  "run_connection": kwargs["run_connection"],
491  "run_machine_name": kwargs["run_machine_name"],
492  "run_machine_uname": kwargs["run_machine_uname"],
493  "run_driver": kwargs["run_driver"],
494  "run_version": kwargs["run_version"],
495  "run_version_short": kwargs["run_version_short"],
496  "run_label": kwargs["label"],
497  "run_gpu_count": kwargs["source_db_gpu_count"],
498  "run_gpu_driver_ver": kwargs["source_db_gpu_driver_ver"],
499  "run_gpu_name": kwargs["source_db_gpu_name"],
500  "run_gpu_mem_mb": kwargs["source_db_gpu_mem"],
501  "run_table": kwargs["source_table"],
502  "query_group": kwargs["query_group"],
503  "query_id": query_results["query_id"],
504  "query_result_set_count": result_count,
505  "query_error_info": query_results["query_error_info"],
506  "query_conn_first": query_results[
507  "initial_iteration_results"
508  ]["first_connect_time"],
509  "query_conn_avg": query_times["connect_time_avg"],
510  "query_conn_min": query_times["connect_time_min"],
511  "query_conn_max": query_times["connect_time_max"],
512  "query_conn_85": query_times["connect_time_85"],
513  "query_exec_first": query_results[
514  "initial_iteration_results"
515  ]["first_execution_time"],
516  "query_exec_avg": query_times["execution_time_avg"],
517  "query_exec_min": query_times["execution_time_min"],
518  "query_exec_max": query_times["execution_time_max"],
519  "query_exec_85": query_times["execution_time_85"],
520  "query_exec_25": query_times["execution_time_25"],
521  "query_exec_stdd": query_times["execution_time_std"],
522  "query_exec_trimmed_avg": query_times[
523  "execution_time_trimmed_avg"
524  ],
525  "query_exec_trimmed_max": query_times[
526  "execution_time_trimmed_max"
527  ],
528  "query_arrow_conversion_avg": query_times[
529  "arrow_conversion_time_avg"
530  ],
531  "query_arrow_conversion_min": query_times[
532  "arrow_conversion_time_min"
533  ],
534  "query_arrow_conversion_max": query_times[
535  "arrow_conversion_time_max"
536  ],
537  "query_arrow_conversion_85": query_times[
538  "arrow_conversion_time_85"
539  ],
540  "query_arrow_conversion_25": query_times[
541  "arrow_conversion_time_25"
542  ],
543  "query_arrow_conversion_stdd": query_times[
544  "arrow_conversion_time_std"
545  ],
546  "query_total_first": query_results[
547  "initial_iteration_results"
548  ]["first_total_time"],
549  "query_total_avg": query_times["total_time_avg"],
550  "query_total_min": query_times["total_time_min"],
551  "query_total_max": query_times["total_time_max"],
552  "query_total_85": query_times["total_time_85"],
553  "query_total_all": query_results[
554  "query_total_elapsed_time"
555  ],
556  "query_total_trimmed_avg": query_times[
557  "total_time_trimmed_avg"
558  ],
559  "cpu_mem_usage_mb": query_results[
560  "initial_iteration_results"
561  ]["first_cpu_mem_usage"],
562  "gpu_mem_usage_mb": query_results[
563  "initial_iteration_results"
564  ]["first_gpu_mem_usage"],
565  },
566  "debug": {
567  "query_exec_times": query_times["execution_times"],
568  "query_total_times": query_times["total_times"],
569  },
570  }
571  elif not query_results["query_succeeded"]:
572  result_dataset = {
573  "name": query_results["query_name"],
574  "mapdql": query_results["query_mapdql"],
575  "succeeded": False,
576  }
577  results_dataset.append(result_dataset)
578  logging.debug("All values set for query " + query_results["query_id"])
579  return results_dataset
580 
581 
582 def process_arguments(input_arguments):
583  # Parse input parameters
584  parser = ArgumentParser()
585  optional = parser._action_groups.pop()
586  required = parser.add_argument_group("required arguments")
587  parser._action_groups.append(optional)
588  optional.add_argument(
589  "-v", "--verbose", action="store_true", help="Turn on debug logging"
590  )
591  optional.add_argument(
592  "-q",
593  "--quiet",
594  action="store_true",
595  help="Suppress script outuput " + "(except warnings and errors)",
596  )
597  required.add_argument(
598  "-u",
599  "--user",
600  dest="user",
601  default="mapd",
602  help="Source database user",
603  )
604  required.add_argument(
605  "-p",
606  "--passwd",
607  dest="passwd",
608  default="HyperInteractive",
609  help="Source database password",
610  )
611  required.add_argument(
612  "-s",
613  "--server",
614  dest="server",
615  default="localhost",
616  help="Source database server hostname",
617  )
618  optional.add_argument(
619  "-o",
620  "--port",
621  dest="port",
622  type=int,
623  default=6274,
624  help="Source database server port",
625  )
626  required.add_argument(
627  "-n",
628  "--name",
629  dest="name",
630  default="mapd",
631  help="Source database name",
632  )
633  required.add_argument(
634  "-t",
635  "--table",
636  dest="table",
637  required=True,
638  help="Source db table name",
639  )
640  required.add_argument(
641  "-l",
642  "--label",
643  dest="label",
644  required=True,
645  help="Benchmark run label",
646  )
647  required.add_argument(
648  "-d",
649  "--queries-dir",
650  dest="queries_dir",
651  help='Absolute path to dir with query files. \
652  [Default: "queries" dir in same location as script]',
653  )
654  required.add_argument(
655  "-i",
656  "--iterations",
657  dest="iterations",
658  type=int,
659  required=True,
660  help="Number of iterations per query. Must be > 1",
661  )
662  optional.add_argument(
663  "-g",
664  "--gpu-count",
665  dest="gpu_count",
666  type=int,
667  default=None,
668  help="Number of GPUs. Not required when gathering local gpu info",
669  )
670  optional.add_argument(
671  "-G",
672  "--gpu-name",
673  dest="gpu_name",
674  type=str,
675  default="",
676  help="Name of GPU(s). Not required when gathering local gpu info",
677  )
678  optional.add_argument(
679  "--no-gather-conn-gpu-info",
680  dest="no_gather_conn_gpu_info",
681  action="store_true",
682  help="Do not gather source database GPU info fields "
683  + "[run_gpu_count, run_gpu_mem_mb] "
684  + "using pymapd connection info. "
685  + "Use when testing a CPU-only server.",
686  )
687  optional.add_argument(
688  "--no-gather-nvml-gpu-info",
689  dest="no_gather_nvml_gpu_info",
690  action="store_true",
691  help="Do not gather source database GPU info fields "
692  + "[gpu_driver_ver, run_gpu_name] "
693  + "from local GPU using pynvml. "
694  + 'Defaults to True when source server is not "localhost". '
695  + "Use when testing a CPU-only server.",
696  )
697  optional.add_argument(
698  "--gather-nvml-gpu-info",
699  dest="gather_nvml_gpu_info",
700  action="store_true",
701  help="Gather source database GPU info fields "
702  + "[gpu_driver_ver, run_gpu_name] "
703  + "from local GPU using pynvml. "
704  + 'Defaults to True when source server is "localhost". '
705  + "Only use when benchmarking against same machine that this script "
706  + "is run from.",
707  )
708  optional.add_argument(
709  "-m",
710  "--machine-name",
711  dest="machine_name",
712  help="Name of source machine",
713  )
714  optional.add_argument(
715  "-a",
716  "--machine-uname",
717  dest="machine_uname",
718  help="Uname info from " + "source machine",
719  )
720  optional.add_argument(
721  "-e",
722  "--destination",
723  dest="destination",
724  default="mapd_db",
725  help="Destination type: [mapd_db, file_json, output, jenkins_bench] "
726  + "Multiple values can be input seperated by commas, "
727  + 'ex: "mapd_db,file_json"',
728  )
729  optional.add_argument(
730  "-U",
731  "--dest-user",
732  dest="dest_user",
733  default="mapd",
734  help="Destination mapd_db database user",
735  )
736  optional.add_argument(
737  "-P",
738  "--dest-passwd",
739  dest="dest_passwd",
740  default="HyperInteractive",
741  help="Destination mapd_db database password",
742  )
743  optional.add_argument(
744  "-S",
745  "--dest-server",
746  dest="dest_server",
747  help="Destination mapd_db database server hostname"
748  + ' (required if destination = "mapd_db")',
749  )
750  optional.add_argument(
751  "-O",
752  "--dest-port",
753  dest="dest_port",
754  type=int,
755  default=6274,
756  help="Destination mapd_db database server port",
757  )
758  optional.add_argument(
759  "-N",
760  "--dest-name",
761  dest="dest_name",
762  default="mapd",
763  help="Destination mapd_db database name",
764  )
765  optional.add_argument(
766  "-T",
767  "--dest-table",
768  dest="dest_table",
769  default="results_arrow",
770  help="Destination mapd_db table name",
771  )
772  optional.add_argument(
773  "-C",
774  "--dest-table-schema-file",
775  dest="dest_table_schema_file",
776  default="results_table_schemas/arrow-results.sql",
777  help="Destination table schema file. This must be an executable "
778  + "CREATE TABLE statement that matches the output of this script. It "
779  + "is required when creating the results_arrow table. Default location is "
780  + 'in "./results_table_schemas/arrow-results.sql"',
781  )
782  optional.add_argument(
783  "-j",
784  "--output-file-json",
785  dest="output_file_json",
786  help="Absolute path of .json output file "
787  + '(required if destination = "file_json")',
788  )
789  optional.add_argument(
790  "-J",
791  "--output-file-jenkins",
792  dest="output_file_jenkins",
793  help="Absolute path of jenkins benchmark .json output file "
794  + '(required if destination = "jenkins_bench")',
795  )
796  optional.add_argument(
797  "-E",
798  "--output-tag-jenkins",
799  dest="output_tag_jenkins",
800  default="",
801  help="Jenkins benchmark result tag. "
802  + 'Optional, appended to table name in "group" field',
803  )
804  optional.add_argument(
805  "--enable-arrow-cpu-output",
806  dest="arrow_cpu_output",
807  action="store_true",
808  help="Output results in Apache Arrow Serialized format on CPU",
809  )
810  args = parser.parse_args(args=input_arguments)
811  return args
812 
813 
814 def benchmark(input_arguments):
815  # Set input args to vars
816  args = process_arguments(input_arguments)
817  verbose = args.verbose
818  quiet = args.quiet
819  source_db_user = args.user
820  source_db_passwd = args.passwd
821  source_db_server = args.server
822  source_db_port = args.port
823  source_db_name = args.name
824  source_table = args.table
825  label = args.label
826  queries_dir = args.queries_dir
827  iterations = args.iterations
828  gpu_count = args.gpu_count
829  gpu_name = args.gpu_name
830  no_gather_conn_gpu_info = args.no_gather_conn_gpu_info
831  no_gather_nvml_gpu_info = args.no_gather_nvml_gpu_info
832  gather_nvml_gpu_info = args.gather_nvml_gpu_info
833  machine_name = args.machine_name
834  machine_uname = args.machine_uname
835  destinations = args.destination
836  dest_db_user = args.dest_user
837  dest_db_passwd = args.dest_passwd
838  dest_db_server = args.dest_server
839  dest_db_port = args.dest_port
840  dest_db_name = args.dest_name
841  dest_table = args.dest_table
842  dest_table_schema_file = args.dest_table_schema_file
843  output_file_json = args.output_file_json
844  output_file_jenkins = args.output_file_jenkins
845  output_tag_jenkins = args.output_tag_jenkins
846  arrow_cpu_output = args.arrow_cpu_output
847 
848  # Hard-coded vars
849  trim = 0.15
850  jenkins_thresholds_name = "average"
851  jenkins_thresholds_field = "query_exec_avg"
852 
853  # Set logging output level
854  if verbose:
855  logging.basicConfig(level=logging.DEBUG)
856  elif quiet:
857  logging.basicConfig(level=logging.WARNING)
858  else:
859  logging.basicConfig(level=logging.INFO)
860 
861  # Input validation
862  if (iterations > 1) is not True:
863  # Need > 1 iteration as first iteration is dropped from calculations
864  logging.error("Iterations must be greater than 1")
865  exit(1)
867  destinations=destinations,
868  dest_db_server=dest_db_server,
869  output_file_json=output_file_json,
870  output_file_jenkins=output_file_jenkins,
871  ):
872  logging.debug("Destination(s) have been verified.")
873  else:
874  logging.error("No valid destination(s) have been set. Exiting.")
875  exit(1)
876 
877  # Establish connection to mapd db
878  con = get_connection(
879  db_user=source_db_user,
880  db_passwd=source_db_passwd,
881  db_server=source_db_server,
882  db_port=source_db_port,
883  db_name=source_db_name,
884  )
885  if not con:
886  exit(1) # Exit if cannot connect to db
887  # Set run-specific variables (time, uid, etc.)
888  run_vars = get_run_vars(con=con)
889  # Set GPU info depending on availability
890  gpu_info = get_gpu_info(
891  gpu_name=gpu_name,
892  no_gather_conn_gpu_info=no_gather_conn_gpu_info,
893  con=con,
894  conn_machine_name=run_vars["conn_machine_name"],
895  no_gather_nvml_gpu_info=no_gather_nvml_gpu_info,
896  gather_nvml_gpu_info=gather_nvml_gpu_info,
897  gpu_count=gpu_count,
898  )
899  # Set run machine info
900  machine_info = get_machine_info(
901  conn_machine_name=run_vars["conn_machine_name"],
902  machine_name=machine_name,
903  machine_uname=machine_uname,
904  )
905  # Read queries from files, set to queries dir in PWD if not passed in
906  if not queries_dir:
907  queries_dir = os.path.join(os.path.dirname(__file__), "queries")
908  query_list = read_query_files(
909  queries_dir=queries_dir, source_table=source_table
910  )
911  if not query_list:
912  exit(1)
913  # Run queries
914  queries_results = []
915  for query in query_list["queries"]:
916  query_result = run_query(
917  query=query,
918  iterations=iterations,
919  trim=trim,
920  con=con,
921  arrow_cpu_output=arrow_cpu_output,
922  )
923  queries_results.append(query_result)
924  logging.info("Completed all queries.")
925  logging.debug("Closing source db connection.")
926  con.close()
927  # Generate results dataset
928  results_dataset = create_results_dataset(
929  run_guid=run_vars["run_guid"],
930  run_timestamp=run_vars["run_timestamp"],
931  run_connection=run_vars["run_connection"],
932  run_machine_name=machine_info["run_machine_name"],
933  run_machine_uname=machine_info["run_machine_uname"],
934  run_driver=run_vars["run_driver"],
935  run_version=run_vars["run_version"],
936  run_version_short=run_vars["run_version_short"],
937  label=label,
938  source_db_gpu_count=gpu_info["source_db_gpu_count"],
939  source_db_gpu_driver_ver=gpu_info["source_db_gpu_driver_ver"],
940  source_db_gpu_name=gpu_info["source_db_gpu_name"],
941  source_db_gpu_mem=gpu_info["source_db_gpu_mem"],
942  source_table=source_table,
943  trim=trim,
944  iterations=iterations,
945  query_group=query_list["query_group"],
946  queries_results=queries_results,
947  )
948  results_dataset_json = json.dumps(
949  results_dataset, default=json_format_handler, indent=2
950  )
951  successful_results_dataset = [
952  x for x in results_dataset if x["succeeded"] is not False
953  ]
954  successful_results_dataset_results = []
955  for results_dataset_entry in successful_results_dataset:
956  successful_results_dataset_results.append(
957  results_dataset_entry["results"]
958  )
959  # Send results to destination(s)
960  sent_destination = True
961  if "mapd_db" in destinations:
962  if not send_results_db(
963  results_dataset=successful_results_dataset_results,
964  table=dest_table,
965  db_user=dest_db_user,
966  db_passwd=dest_db_passwd,
967  db_server=dest_db_server,
968  db_port=dest_db_port,
969  db_name=dest_db_name,
970  table_schema_file=dest_table_schema_file,
971  ):
972  sent_destination = False
973  if "file_json" in destinations:
974  if not send_results_file_json(
975  results_dataset_json=results_dataset_json,
976  output_file_json=output_file_json,
977  ):
978  sent_destination = False
979  if "jenkins_bench" in destinations:
981  results_dataset=successful_results_dataset_results,
982  thresholds_name=jenkins_thresholds_name,
983  thresholds_field=jenkins_thresholds_field,
984  output_tag_jenkins=output_tag_jenkins,
985  output_file_jenkins=output_file_jenkins,
986  ):
987  sent_destination = False
988  if "output" in destinations:
989  if not send_results_output(results_dataset_json=results_dataset_json):
990  sent_destination = False
991  if not sent_destination:
992  logging.error("Sending results to one or more destinations failed")
993  exit(1)
994  else:
995  logging.info(
996  "Succesfully loaded query results info into destination(s)"
997  )
998 
999 
1000 if __name__ == "__main__":
1001  benchmark(sys.argv[1:])
def verify_destinations
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:136
def send_results_file_json
def send_results_jenkins_bench