OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
bench_batch_streaming_ingest.py
Go to the documentation of this file.
1 import sys
2 import pymapd
3 import pyarrow as pa
4 import pandas as pd
5 import numpy as np
6 import time
7 
8 import argparse
9 
10 def getOptions(args=None):
11  parser = argparse.ArgumentParser(description='Benchmark HEAVY.AI batch and streaming table ingest')
12  parser.add_argument('-s','--host', help='HEAVY.AI server address', default='localhost')
13  parser.add_argument('-p','--port', help='HEAVY.AI server port', default='6273')
14  parser.add_argument('-d','--db', help='HEAVY.AI database name', default='heavyai')
15  parser.add_argument('-u','--user', help='HEAVY.AI user name', default='admin')
16  parser.add_argument('-w','--password', help='HEAVY.AI password', default='HyperInteractive')
17  parser.add_argument('-e','--max_rollback_epochs', help='Max Rollback Epochs', type=int, default=-1)
18  parser.add_argument('-t','--temp_table', help='Use temporary table', type=bool, default=False)
19  parser.add_argument('-r','--num_rows', help='Number of rows to benchmark with', type=int, default=10000)
20  return parser.parse_args(args)
21 
22 
23 class OmniCon:
24  def __init__(self, user, pw, dbname):
25  self.con = pymapd.connect(user=user, password=pw, dbname=dbname, host="localhost")
26  self.cursor = self.con.cursor()
27 
28  def query(self, sql):
29  return self.cursor.execute(sql)
30 
31 def create_table(omni_con, table_name, is_temporary=False, max_rollback_epochs=-1):
32  drop_sql = "DROP TABLE IF EXISTS " + table_name
33  optional_temp_stmt = "TEMPORARY" if is_temporary else ""
34  optional_max_rollback_stmt = "WITH (max_rollback_epochs={max_rollback_epochs})".format(max_rollback_epochs=max_rollback_epochs) if max_rollback_epochs >= 0 else ""
35  create_sql = "CREATE {optional_temp_stmt} TABLE {table_name} (a INTEGER, b INTEGER, c INTEGER, d INTEGER) {optional_max_rollback_stmt}".format(optional_temp_stmt = optional_temp_stmt, table_name=table_name, optional_max_rollback_stmt=optional_max_rollback_stmt)
36  omni_con.query(drop_sql)
37  omni_con.query(create_sql)
38 
39 def gen_data(num_rows):
40  df = pd.DataFrame(np.random.randint(0,100,size=(num_rows, 4)), columns=['a','b','c','d'])
41  df = df.astype(np.int32)
42  return df
43 
44 def bench_streaming_sql_inserts(omni_con, table_name, data):
45  num_rows = len(data.index)
46  base_insert_sql = "INSERT INTO " + table_name + "(a, b, c, d) VALUES ({0}, {1}, {2}, {3})"
47  insert_statements = []
48  for r in range(num_rows):
49  insert_statements.append(base_insert_sql.format(data.iat[r,0], data.iat[r,1], data.iat[r,2], data.iat[r,3]))
50  start_time = time.perf_counter()
51  for r in range(num_rows):
52  omni_con.query(insert_statements[r])
53  end_time = time.perf_counter()
54  time_diff = end_time - start_time
55  rows_per_second = num_rows / time_diff
56  print("Streaming – SQL Inserts: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
57 
58 def bench_bulk_columnar(omni_con, table_name, data):
59  num_rows = len(data.index)
60  start_time = time.perf_counter()
61  omni_con.con.load_table_columnar(table_name, data, preserve_index=False)
62  end_time = time.perf_counter()
63  time_diff = end_time - start_time
64  rows_per_second = num_rows / time_diff
65  print("Bulk load – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
66 
67 def bench_bulk_arrow(omni_con, table_name, data):
68  num_rows = len(data.index)
69  arrow_data = pa.Table.from_pandas(data)
70  start_time = time.perf_counter()
71  omni_con.con.load_table_arrow(table_name, arrow_data, preserve_index=False)
72  end_time = time.perf_counter()
73  time_diff = end_time - start_time
74  rows_per_second = num_rows / time_diff
75  print("Bulk load – Arrow: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
76 
77 def bench_streaming_columnar(omni_con, table_name, data):
78  num_rows = len(data.index)
79  start_time = time.perf_counter()
80  for r in range(num_rows):
81  row_df = data.iloc[r:r+1]
82  omni_con.con.load_table_columnar(table_name, row_df, preserve_index=False)
83  end_time = time.perf_counter()
84  time_diff = end_time - start_time
85  rows_per_second = num_rows / time_diff
86  print("Streaming – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
87 
88 def main(argv):
89  options = getOptions(argv)
90  omni_con = OmniCon(options.user, options.password, options.db)
91 
92  data = gen_data(options.num_rows)
93 
94  table_name = "stream_insert_sql"
95  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
96  bench_streaming_sql_inserts(omni_con, table_name, data)
97 
98  #Below is too slow to bench at any real scale
99  #table_name = "stream_columnar"
100  #create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
101  #bench_streaming_columnar(omni_con, table_name, data)
102 
103  table_name = "bulk_columnar"
104  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
105  bench_bulk_columnar(omni_con, table_name, data)
106 
107  table_name = "bulk_arrow"
108  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
109  bench_bulk_arrow(omni_con, table_name, data)
110 
111 if __name__ == "__main__":
112  main(sys.argv[1:])