OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
create_table.py
Go to the documentation of this file.
1 import os
2 import sys
3 import subprocess
4 import random
5 import datetime
6 import pymapd
7 from multiprocessing import Pool, cpu_count
8 from argparse import ArgumentParser
9 
10 
11 class Column:
12  def __init__(self, column_name, sql_type, lower, upper, step=1):
13  self.column_name = column_name
14  assert sql_type.upper() in ["INT", "BIGINT"]
15  self.sql_type = sql_type.upper()
16  assert upper > lower
17  self.lower_bound = lower
18  self.upper_bound = upper
19  assert step >= 1
20  self.step = step
21  if self.sql_type in ["INT"]:
22  assert (
23  self.upper_bound * step <= 2 ** 31
24  ), "Generated values are larger than 32-bit signed integer."
25 
26  def generateEntry(self):
27  if self.sql_type in ["INT", "BIGINT"]:
28  return self.generateInt() * self.step
29  else:
30  assert False, "SQL type " + self.sql_type + " not supported yet"
31 
32  def generateInt(self):
33  return int(random.randint(self.lower_bound, self.upper_bound))
34 
36  """
37  Returns the ColumnDetails as expected by pymapd's API
38  """
39  result = "ColumnDetails(name='"
40  result += self.column_name
41  result += "', type='"
42  result += self.sql_type.upper()
43  result += "', nullable=True, precision=0, scale=0, comp_param=0, encoding='NONE', is_array=False)"
44  return result
45 
46 
48  def __init__(self, **kwargs):
49  """
50  kwargs:
51  table_name(str): synthetic table's name in the database
52  fragment_size(int): fragment size (number of entries per fragment)
53  num_fragment(int): total number of fragments for the synthetic table
54  db_user(str): database username
55  db_password(str): database password
56  db_port(int): database port
57  db_name(str): database name
58  db_server(str): database server name
59  data_dir_path(str): path to directory that will include the generated data
60  is_remote_server(Bool): if True, it indicates that this class is not created on the
61  same machine that is going to host the server.
62  """
63  self.table_name = kwargs["table_name"]
64  self.fragment_size = kwargs["fragment_size"]
65  self.num_fragments = kwargs["num_fragments"]
66  self.db_name = kwargs["db_name"]
67  self.db_user = kwargs["db_user"]
68  self.db_password = kwargs["db_password"]
69  self.db_server = kwargs["db_server"]
70  self.db_port = kwargs["db_port"]
71  self.data_dir_path = kwargs["data_dir_path"]
74  self.data_dir_path = kwargs["data_dir_path"]
75  self.is_remote_server = kwargs["is_remote_server"]
76  if not os.path.isdir(self.data_dir_path):
77  os.mkdir(self.data_dir_path)
78  self.data_file_name_base = self.data_dir_path + "/data"
79 
80  def createDataAndImportTable(self, skip_data_generation=False):
81  # deciding whether it is required to generate data and import it into the database
82  # or the data already exists there:
83  if (
86  ):
87  print(
88  "Data already exists in the database, proceeding to the queries:"
89  )
90  else:
91  if self.is_remote_server:
92  # at this point, we abort the procedure as the data is
93  # either not present in the remote server or the schema/number of rows
94  # does not match of those indicated by this class.
95  raise Exception(
96  "Proper data does not exist in the remote server."
97  )
98  else:
99  # generate random synthetic data
100  if not skip_data_generation:
101  # choosing a relatively unique name for the generated csv files
102  current_time = str(datetime.datetime.now()).split()
103  self.data_file_name_base += "_" + current_time[0]
104 
105  self.generateDataParallel()
106  print(
107  "Synthetic data created: "
108  + str(self.num_entries)
109  + " rows"
110  )
111  # create a table on the database:
112  self.createTableInDB()
113  # import the generated data into the data base:
115  print("Data imported into the database")
116 
118  column_list = []
119  # columns with uniform distribution and step=1
120  column_list.append(Column("x10", "INT", 1, 10))
121  column_list.append(Column("y10", "INT", 1, 10))
122  column_list.append(Column("z10", "INT", 1, 10))
123  column_list.append(Column("x100", "INT", 1, 100))
124  column_list.append(Column("y100", "INT", 1, 100))
125  column_list.append(Column("z100", "INT", 1, 100))
126  column_list.append(Column("x1k", "INT", 1, 1000))
127  column_list.append(Column("x10k", "INT", 1, 10000))
128  column_list.append(Column("x100k", "INT", 1, 100000))
129  column_list.append(Column("x1m", "INT", 1, 1000000))
130  column_list.append(Column("x10m", "INT", 1, 10000000))
131 
132  # columns with step != 1
133  # cardinality = 10k, range = 100m
134  column_list.append(Column("x10k_s10k", "BIGINT", 1, 10000, 10000))
135  column_list.append(Column("x100k_s10k", "BIGINT", 1, 100000, 10000))
136  column_list.append(Column("x1m_s10k", "BIGINT", 1, 1000000, 10000))
137  return column_list
138 
140  create_sql = "CREATE TABLE " + self.table_name + " ( "
141  for column_idx in range(len(self.column_list)):
142  column = self.column_list[column_idx]
143  create_sql += column.column_name + " " + column.sql_type
144  if column_idx != (len(self.column_list) - 1):
145  create_sql += ", "
146  create_sql += ")"
147  if self.fragment_size != 32000000:
148  create_sql += (
149  " WITH (FRAGMENT_SIZE = " + str(self.fragment_size) + ")"
150  )
151  create_sql += ";"
152  return create_sql
153 
155  copy_sql = "COPY " + self.table_name + " FROM '"
156  copy_sql += (
157  self.data_file_name_base + "*.csv' WITH (header = 'false');"
158  )
159  return copy_sql
160 
161  def generateData(self, thread_idx, size):
162  """
163  Single-thread random data generation based on the provided schema.
164  Data is stored in CSV format.
165  """
166  file_name = (
167  self.data_file_name_base + "_part" + str(thread_idx) + ".csv"
168  )
169  with open(file_name, "w") as f:
170  for i in range(size):
171  f.write(
172  ",".join(
173  map(
174  str,
175  [col.generateEntry() for col in self.column_list],
176  )
177  )
178  )
179  f.write("\n")
180 
182  """
183  Uses all available CPU threads to generate random data based on the
184  provided schema. Data is stored in CSV format.
185  """
186  num_threads = cpu_count()
187  num_entries_per_thread = int(
188  (self.num_entries + num_threads - 1) / num_threads
189  )
190  thread_index = [i for i in range(0, num_threads)]
191 
192  # making sure we end up having as many fragments as the user asked for
193  num_balanced_entries = [
194  num_entries_per_thread for _ in range(num_threads)
195  ]
196  if self.num_entries != num_entries_per_thread * num_threads:
197  last_threads_portion = (
198  self.num_entries - num_entries_per_thread * (num_threads - 1)
199  )
200  num_balanced_entries[-1] = last_threads_portion
201 
202  arguments = zip(thread_index, num_balanced_entries)
203 
204  with Pool(num_threads) as pool:
205  pool.starmap(self.generateData, arguments)
206 
208  """
209  Creates table details in the same format as expected
210  from pymapd's get_table_details
211  """
212  return [
213  column.createColumnDetailsString() for column in self.column_list
214  ]
215 
217  """
218  Verifies whether the existing table in the database has the expected
219  schema or not.
220  """
221  try:
222  con = pymapd.connect(
223  user=self.db_user,
224  password=self.db_password,
225  host=self.db_server,
226  port=self.db_port,
227  dbname=self.db_name,
228  )
229  except:
230  raise Exception("Pymapd's connection to the server has failed.")
231  try:
232  table_details = con.get_table_details(self.table_name)
233  except:
234  # table does not exist
235  print("Table does not exist in the database")
236  return False
237 
238  if [
239  str(table_detail) for table_detail in table_details
240  ] == self.createExpectedTableDetails():
241  return True
242  else:
243  print("Schema does not match the expected one:")
244  print(
245  "Observed table details: "
246  + str([str(table_detail) for table_detail in table_details])
247  )
248  print(
249  "Expected table details: "
250  + str(self.createExpectedTableDetails())
251  )
252 
254  """
255  Verifies whether the existing table in the database has the expected
256  number of entries in it as in this class.
257  """
258  try:
259  con = pymapd.connect(
260  user=self.db_user,
261  password=self.db_password,
262  host=self.db_server,
263  port=self.db_port,
264  dbname=self.db_name,
265  )
266  result = con.execute(
267  "select count(*) from " + self.table_name + ";"
268  )
269  if list(result)[0][0] == self.num_entries:
270  return True
271  else:
272  print("Expected num rows did not match:")
273  return False
274  except:
275  raise Exception("Pymapd's connection to the server has failed.")
276 
277  def createTableInDB(self):
278  try:
279  con = pymapd.connect(
280  user=self.db_user,
281  password=self.db_password,
282  host=self.db_server,
283  port=self.db_port,
284  dbname=self.db_name,
285  )
286  # drop the current table if exists:
287  con.execute("DROP TABLE IF EXISTS " + self.table_name + ";")
288  # create a new table:
289  con.execute(self.getCreateTableCommand())
290  except:
291  raise Exception("Failure in creating a new table.")
292 
294  try:
295  con = pymapd.connect(
296  user=self.db_user,
297  password=self.db_password,
298  host=self.db_server,
299  port=self.db_port,
300  dbname=self.db_name,
301  )
302  # import generated data:
303  con.execute(self.getCopyFromCommand())
304  except:
305  raise Exception("Failure in importing data into the table")
306 
307 
308 if __name__ == "__main__":
309  parser = ArgumentParser()
310  required = parser.add_argument_group("required arguments")
311  required.add_argument("--user", dest="user", default="admin")
312  required.add_argument(
313  "--password", dest="password", default="HyperInteractive"
314  )
315  required.add_argument(
316  "--table_name", dest="table_name", default="synthetic_test_table"
317  )
318  required.add_argument("--fragment_size", dest="fragment_size", default="1")
319  required.add_argument(
320  "--num_fragments", dest="num_fragments", default="128"
321  )
322  required.add_argument("--name", dest="name", default="heavyai")
323  required.add_argument("--server", dest="server", default="localhost")
324  required.add_argument("--port", dest="port", default="6274")
325  required.add_argument(
326  "--data_dir",
327  dest="data_dir",
328  default=os.getcwd() + "/../build/synthetic_data",
329  )
330  required.add_argument(
331  "--just_data_generation",
332  dest="just_data_generation",
333  action="store_true",
334  help="Indicates that the code will only generates synthetic data, bypassing all "
335  + "other capabilities. The generated data will be stored in DATA_DIR.",
336  )
337  required.add_argument(
338  "--just_data_import",
339  dest="just_data_import",
340  action="store_true",
341  help="Indicates that the code assumes the data is generated and exists at data_dir/*.csv. "
342  + "It then proceeds with table creation and data import. ",
343  )
344  args = parser.parse_args()
345 
346  synthetic_table = SyntheticTable(
347  table_name=args.table_name,
348  fragment_size=int(args.fragment_size),
349  num_fragments=int(args.num_fragments),
350  db_name=args.name,
351  db_user=args.user,
352  db_password=args.password,
353  db_server=args.server,
354  db_port=int(args.port),
355  data_dir_path=args.data_dir,
356  is_remote_server=False,
357  )
358 
359  if (args.just_data_generation is False) and (
360  args.just_data_import is False
361  ):
362  synthetic_table.createDataAndImportTable()
363  elif args.just_data_generation is True:
364  synthetic_table.generateDataParallel()
365  print(
366  "Synthetic data created: "
367  + str(synthetic_table.num_entries)
368  + " rows"
369  )
370  else:
371  synthetic_table.createDataAndImportTable(args.just_data_import)
std::string join(T const &container, std::string const &delim)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
int open(const char *path, int flags, int mode)
Definition: heavyai_fs.cpp:66