OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
bench_batch_streaming_ingest Namespace Reference

Classes

class  OmniCon
 

Functions

def getOptions
 
def create_table
 
def gen_data
 
def bench_streaming_sql_inserts
 
def bench_bulk_columnar
 
def bench_bulk_arrow
 
def bench_streaming_columnar
 
def main
 

Function Documentation

def bench_batch_streaming_ingest.bench_bulk_arrow (   omni_con,
  table_name,
  data 
)

Definition at line 67 of file bench_batch_streaming_ingest.py.

Referenced by main().

67 
68 def bench_bulk_arrow(omni_con, table_name, data):
69  num_rows = len(data.index)
70  arrow_data = pa.Table.from_pandas(data)
71  start_time = time.perf_counter()
72  omni_con.con.load_table_arrow(table_name, arrow_data, preserve_index=False)
73  end_time = time.perf_counter()
74  time_diff = end_time - start_time
75  rows_per_second = num_rows / time_diff
76  print("Bulk load – Arrow: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.bench_bulk_columnar (   omni_con,
  table_name,
  data 
)

Definition at line 58 of file bench_batch_streaming_ingest.py.

Referenced by main().

58 
59 def bench_bulk_columnar(omni_con, table_name, data):
60  num_rows = len(data.index)
61  start_time = time.perf_counter()
62  omni_con.con.load_table_columnar(table_name, data, preserve_index=False)
63  end_time = time.perf_counter()
64  time_diff = end_time - start_time
65  rows_per_second = num_rows / time_diff
66  print("Bulk load – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.bench_streaming_columnar (   omni_con,
  table_name,
  data 
)

Definition at line 77 of file bench_batch_streaming_ingest.py.

77 
78 def bench_streaming_columnar(omni_con, table_name, data):
79  num_rows = len(data.index)
80  start_time = time.perf_counter()
81  for r in range(num_rows):
82  row_df = data.iloc[r:r+1]
83  omni_con.con.load_table_columnar(table_name, row_df, preserve_index=False)
84  end_time = time.perf_counter()
85  time_diff = end_time - start_time
86  rows_per_second = num_rows / time_diff
87  print("Streaming – Columnar: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))
def bench_batch_streaming_ingest.bench_streaming_sql_inserts (   omni_con,
  table_name,
  data 
)

Definition at line 44 of file bench_batch_streaming_ingest.py.

Referenced by main().

44 
45 def bench_streaming_sql_inserts(omni_con, table_name, data):
46  num_rows = len(data.index)
47  base_insert_sql = "INSERT INTO " + table_name + "(a, b, c, d) VALUES ({0}, {1}, {2}, {3})"
48  insert_statements = []
49  for r in range(num_rows):
50  insert_statements.append(base_insert_sql.format(data.iat[r,0], data.iat[r,1], data.iat[r,2], data.iat[r,3]))
51  start_time = time.perf_counter()
52  for r in range(num_rows):
53  omni_con.query(insert_statements[r])
54  end_time = time.perf_counter()
55  time_diff = end_time - start_time
56  rows_per_second = num_rows / time_diff
57  print("Streaming – SQL Inserts: {0} rows in {1} seconds at {2} rows/sec".format(num_rows, time_diff, rows_per_second))

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.create_table (   omni_con,
  table_name,
  is_temporary = False,
  max_rollback_epochs = -1 
)

Definition at line 31 of file bench_batch_streaming_ingest.py.

Referenced by Parser::CreateTableAsSelectStmt.execute(), and run_query_external().

31 
32 def create_table(omni_con, table_name, is_temporary=False, max_rollback_epochs=-1):
33  drop_sql = "DROP TABLE IF EXISTS " + table_name
34  optional_temp_stmt = "TEMPORARY" if is_temporary else ""
35  optional_max_rollback_stmt = "WITH (max_rollback_epochs={max_rollback_epochs})".format(max_rollback_epochs=max_rollback_epochs) if max_rollback_epochs >= 0 else ""
36  create_sql = "CREATE {optional_temp_stmt} TABLE {table_name} (a INTEGER, b INTEGER, c INTEGER, d INTEGER) {optional_max_rollback_stmt}".format(optional_temp_stmt = optional_temp_stmt, table_name=table_name, optional_max_rollback_stmt=optional_max_rollback_stmt)
37  omni_con.query(drop_sql)
38  omni_con.query(create_sql)

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.gen_data (   num_rows)

Definition at line 39 of file bench_batch_streaming_ingest.py.

Referenced by main().

39 
40 def gen_data(num_rows):
41  df = pd.DataFrame(np.random.randint(0,100,size=(num_rows, 4)), columns=['a','b','c','d'])
42  df = df.astype(np.int32)
43  return df

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.getOptions (   args = None)

Definition at line 10 of file bench_batch_streaming_ingest.py.

Referenced by main().

10 
11 def getOptions(args=None):
12  parser = argparse.ArgumentParser(description='Benchmark HEAVY.AI batch and streaming table ingest')
13  parser.add_argument('-s','--host', help='HEAVY.AI server address', default='localhost')
14  parser.add_argument('-p','--port', help='HEAVY.AI server port', default='6273')
15  parser.add_argument('-d','--db', help='HEAVY.AI database name', default='heavyai')
16  parser.add_argument('-u','--user', help='HEAVY.AI user name', default='admin')
17  parser.add_argument('-w','--password', help='HEAVY.AI password', default='HyperInteractive')
18  parser.add_argument('-e','--max_rollback_epochs', help='Max Rollback Epochs', type=int, default=-1)
19  parser.add_argument('-t','--temp_table', help='Use temporary table', type=bool, default=False)
20  parser.add_argument('-r','--num_rows', help='Number of rows to benchmark with', type=int, default=10000)
21  return parser.parse_args(args)
22 

+ Here is the caller graph for this function:

def bench_batch_streaming_ingest.main (   argv)

Definition at line 88 of file bench_batch_streaming_ingest.py.

References bench_bulk_arrow(), bench_bulk_columnar(), bench_streaming_sql_inserts(), gen_data(), and getOptions().

88 
89 def main(argv):
90  options = getOptions(argv)
91  omni_con = OmniCon(options.user, options.password, options.db)
92 
93  data = gen_data(options.num_rows)
94 
95  table_name = "stream_insert_sql"
96  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
97  bench_streaming_sql_inserts(omni_con, table_name, data)
98 
99  #Below is too slow to bench at any real scale
100  #table_name = "stream_columnar"
101  #create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
102  #bench_streaming_columnar(omni_con, table_name, data)
103 
104  table_name = "bulk_columnar"
105  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
106  bench_bulk_columnar(omni_con, table_name, data)
107 
108  table_name = "bulk_arrow"
109  create_table(omni_con, table_name, options.temp_table, options.max_rollback_epochs)
110  bench_bulk_arrow(omni_con, table_name, data)

+ Here is the call graph for this function: