OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
benchmarks.py
Go to the documentation of this file.
1 # Copyright 2022 HEAVY.AI, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import conbench.runner
16 import json
17 import glob
18 import os
19 import re
20 import subprocess
21 import time
22 
23 # Required --run-name "A: B" will display A in the Reason column.
24 # Recommended example: --run-name "TPC_DS_10GB: $GIT_COMMIT"
25 # Assumes pwd = Benchmarks/conbench and build_dir is build-$GIT_COMMIT
26 
27 def conversionFactor(time_unit_to, time_unit_from):
28  # See GetTimeUnitString() in https://github.com/google/benchmark/blob/main/include/benchmark/benchmark.h
29  powers = { 's': 0, 'ms': 3, 'us': 6, 'ns': 9 }
30  return 10 ** (powers[time_unit_to] - powers[time_unit_from])
31 
32 # Name the derived class after the google benchmark executable.
34  '''Run google benchmarks and publish them to the network conbench server.'''
35 
36  external = True
37  description = 'Google Benchmark'
38 
39  def aggregateBenchmarks(self, benchmarks):
40  aggregates = {}
41  for benchmark in benchmarks:
42  if 'aggregate_name' not in benchmark:
43  aggregate = aggregates.get(benchmark['name'])
44  if aggregate == None:
45  aggregates[benchmark['name']] = {
46  'data': [ benchmark['real_time'] ],
47  'unit': benchmark['time_unit'] }
48  else:
49  conversion_factor = conversionFactor(aggregate['unit'], benchmark['time_unit'])
50  aggregate['data'].append(conversion_factor * benchmark['real_time'])
51  return aggregates
52 
53  def run(self, name, kwargs):
54  context = { 'benchmark_language': 'C++' }
55  commit = os.environ.get('GIT_COMMIT')
56  build_dir = '../../build-{0}'.format(commit)
57  benchmark_out = '{0}/{1}-{2}.json'.format(os.getcwd(), name, commit)
58 
59  command = ['./'+name, '--benchmark_repetitions=7', '--benchmark_out='+benchmark_out]
60  subprocess.run(command, cwd=build_dir+'/Tests')
61 
62  report = json.load(open(benchmark_out))
63  info = report['context']
64  info['branch'] = os.environ.get('GIT_BRANCH')
65  options = kwargs
66 
67  # Aggregate the benchmark_repetitions by benchmark name
68  aggregates = self.aggregateBenchmarks(report['benchmarks'])
69  for benchmark_name, result in aggregates.items():
70  # Different tags correspond to different 'Benchmarks' in conbench
71  tags = { 'benchmark_name': benchmark_name }
72  yield self.conbench.record(
73  result, name, context=context, options=options, output=result, tags=tags, info=info
74  )
75 
77  def __init__(self, bindir, datadir, port_main, port_http, port_calcite): # Start heavydb server
78  self.initDataDir(bindir, datadir)
79  self.heavydb = subprocess.Popen([bindir+'/heavydb', '--allowed-import-paths=["/"]',
80  '--allowed-export-paths=["/"]', '--enable-http-binary-server=0', '--port='+str(port_main),
81  '--http-port='+str(port_http), '--calcite-port='+str(port_calcite), datadir])
82  print('heavydb server started pid='+str(self.heavydb.pid))
83  def __del__(self): # Shutdown heavydb server
84  print('Shutting down heavydb server.')
85  self.heavydb.terminate() # Cleaner than kill()
86  print('Server return value=%d' % (self.heavydb.wait()))
87  def initDataDir(self, bindir, datadir):
88  if not os.path.isdir(datadir):
89  os.mkdir(datadir, mode=0o775)
90  initheavy = subprocess.run([bindir+'/initheavy', datadir])
91  assert initheavy.returncode == 0, 'initheavy returned {0}.'.format(initheavy)
92 
93 # Execute test query to count US states
94 def numberOfUsStates(bindir, port_main):
95  query = b'SELECT COUNT(*) FROM heavyai_us_states;'
96  print('Running test query: %s' % (query))
97  FAILED_TO_OPEN_TRANSPORT = b'Failed to open transport. Is heavydb running?'
98  stdout = FAILED_TO_OPEN_TRANSPORT
99  stderr = b''
100  attempts = 0
101  while stdout.startswith(FAILED_TO_OPEN_TRANSPORT):
102  time.sleep(1)
103  attempts += 1
104  print('Connection attempt {0}'.format(attempts))
105  if (120 < attempts):
106  print('Too many failed connection attempts. Returning -1.')
107  return -1
108  heavysql = subprocess.Popen([bindir+'/heavysql', '-p', 'HyperInteractive', '--port', str(port_main),
109  '--quiet'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
110  (stdout, stderr) = heavysql.communicate(query)
111  return int(stdout)
112 
113 def testConnection(bindir, port_main):
114  number_of_us_states = numberOfUsStates(bindir, port_main)
115  assert number_of_us_states in range(13,100), 'Incorrect number of US states(%d)' % (number_of_us_states)
116  print('Counted %d rows in table heavyai_us_states.' % (number_of_us_states))
117 
118 @conbench.runner.register_benchmark
119 class StringDictionaryBenchmark(conbench.runner.Benchmark, GoogleBenchmark):
120  name = __qualname__
121  def run(self, **kwargs):
122  yield from GoogleBenchmark.run(self, self.name, kwargs)
123 
124 @conbench.runner.register_benchmark
125 class TPC_DS_10GB(conbench.runner.Benchmark):
126  '''TPC-DS SQL tests'''
127 
128  SCALE='10'
129  external = True
130  description = 'TPC-DS SCALE=%s SQL tests' % (SCALE)
131  name = __qualname__
132 
133  BASENAME='TPC-DS_Tools_v3.2.0'
134  OUTPUT_DIR='%s_%sGB' % (BASENAME, SCALE)
135  PG_TGZ=OUTPUT_DIR + '.tgz'
136  DATADIR='storage'
137  PORT_MAIN=16274
138  PORT_HTTP=16278
139  PORT_CALCITE=16279
140  SENTINEL_FAILED_TIME=1234567.890 # 20 minutes and 34.56789 seconds is the sentinel failure value
141 
143  assets_dir = os.environ.get('TPCDS_ASSETS_DIR')
144  assert assets_dir != None, 'Please set env variable TPCDS_ASSETS_DIR to directory with %s.' % (self.PG_TGZ)
145  file = self.PG_TGZ
146  assert os.path.exists('%s/.conbench' % os.environ.get('HOME')
147  ), 'A .conbench file is required to submit results to a conbench server.'
148  assert os.path.exists('%s/%s' % (assets_dir, file)), 'File %s not found in %s.' % (file, assets_dir)
149  return assets_dir
150 
151  def setupAndChdirToWorkingDirectory(self, workingdir):
152  assets_dir = self.checkForRequiredFiles()
153  os.mkdir(workingdir, mode=0o775)
154  rakefile = os.path.realpath(os.path.join(os.getcwd(),'../rake/Rakefile'))
155  subprocess.run(['ln', '-s', rakefile, workingdir])
156  os.chdir(workingdir)
157  # Untar postgres results
158  subprocess.run(['tar', 'zxf', '%s/%s'%(assets_dir,self.PG_TGZ)])
159 
160  def run(self, **kwargs):
161  commit = os.environ.get('GIT_COMMIT')
162  build_dir = os.path.realpath(os.path.join(os.getcwd(),'../../build-%s'%(commit)))
163  self.setupAndChdirToWorkingDirectory(build_dir + '/conbench')
164  # Start server on new port
165  bindir = build_dir + '/bin'
166  heavy_db_server = HeavyDbServer(bindir, self.DATADIR, self.PORT_MAIN, self.PORT_HTTP, self.PORT_CALCITE)
167  testConnection(bindir, self.PORT_MAIN)
168  # Run rake task
169  env = { 'HEAVYSQL': '%s/heavysql -p HyperInteractive --port %d'%(bindir,self.PORT_MAIN)
170  , 'SCALE': self.SCALE, 'SKIP_PG': '1', 'PATH': os.environ.get('PATH') }
171  subprocess.run('rake tpcds:compare', env=env, shell=True)
172  # report to conbench server
173  context = { 'benchmark_language': 'C++' }
174  info = { 'branch': os.environ.get('GIT_BRANCH') }
175  options = kwargs
176  # Read and process output json files
177  def query_num(filename):
178  md = re.search('/query(\d+).json$', filename)
179  return int(md.group(1)) if md else 0
180  for filename in sorted(glob.glob(self.OUTPUT_DIR+'/query*.json'), key=query_num):
181  with open(filename) as file:
182  benchmark = json.load(file)
183  tags = { 'benchmark_name': '%02d' % query_num(filename) }
184  data = [ benchmark['time_ms'] ] if benchmark['success'] else [ self.SENTINEL_FAILED_TIME ]
185  result = { 'data': data, 'unit': 'ms' }
186  info['message'] = benchmark['message'][:4096] # truncate large messages
187  yield self.conbench.record(
188  result, self.name, context=context, options=options, output=result, tags=tags, info=info
189  )
def conversionFactor
Definition: benchmarks.py:27
size_t append(FILE *f, const size_t size, const int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:158
def testConnection
Definition: benchmarks.py:113
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66
def numberOfUsStates
Definition: benchmarks.py:94