7 from multiprocessing
import Pool, cpu_count
8 from argparse
import ArgumentParser
12 def __init__(self, column_name, sql_type, lower, upper, step=1):
14 assert sql_type.upper()
in [
"INT",
"BIGINT"]
24 ),
"Generated values are larger than 32-bit signed integer." 27 if self.
sql_type in [
"INT",
"BIGINT"]:
30 assert False,
"SQL type " + self.
sql_type +
" not supported yet" 37 Returns the ColumnDetails as expected by pymapd's API 39 result =
"ColumnDetails(name='" 43 result +=
"', nullable=True, precision=0, scale=0, comp_param=0, encoding='NONE', is_array=False)" 51 table_name(str): synthetic table's name in the database 52 fragment_size(int): fragment size (number of entries per fragment) 53 num_fragment(int): total number of fragments for the synthetic table 54 db_user(str): database username 55 db_password(str): database password 56 db_port(int): database port 57 db_name(str): database name 58 db_server(str): database server name 59 data_dir_path(str): path to directory that will include the generated data 60 is_remote_server(Bool): if True, it indicates that this class is not created on the 61 same machine that is going to host the server. 88 "Data already exists in the database, proceeding to the queries:" 96 "Proper data does not exist in the remote server." 100 if not skip_data_generation:
102 current_time = str(datetime.datetime.now()).
split()
107 "Synthetic data created: " 115 print(
"Data imported into the database")
120 column_list.append(
Column(
"x10",
"INT", 1, 10))
121 column_list.append(
Column(
"y10",
"INT", 1, 10))
122 column_list.append(
Column(
"z10",
"INT", 1, 10))
123 column_list.append(
Column(
"x100",
"INT", 1, 100))
124 column_list.append(
Column(
"y100",
"INT", 1, 100))
125 column_list.append(
Column(
"z100",
"INT", 1, 100))
126 column_list.append(
Column(
"x1k",
"INT", 1, 1000))
127 column_list.append(
Column(
"x10k",
"INT", 1, 10000))
128 column_list.append(
Column(
"x100k",
"INT", 1, 100000))
129 column_list.append(
Column(
"x1m",
"INT", 1, 1000000))
130 column_list.append(
Column(
"x10m",
"INT", 1, 10000000))
134 column_list.append(
Column(
"x10k_s10k",
"BIGINT", 1, 10000, 10000))
135 column_list.append(
Column(
"x100k_s10k",
"BIGINT", 1, 100000, 10000))
136 column_list.append(
Column(
"x1m_s10k",
"BIGINT", 1, 1000000, 10000))
140 create_sql =
"CREATE TABLE " + self.
table_name +
" ( " 143 create_sql += column.column_name +
" " + column.sql_type
155 copy_sql =
"COPY " + self.
table_name +
" FROM '" 163 Single-thread random data generation based on the provided schema. 164 Data is stored in CSV format. 169 with
open(file_name,
"w")
as f:
170 for i
in range(size):
183 Uses all available CPU threads to generate random data based on the 184 provided schema. Data is stored in CSV format. 186 num_threads = cpu_count()
187 num_entries_per_thread = int(
190 thread_index = [i
for i
in range(0, num_threads)]
193 num_balanced_entries = [
194 num_entries_per_thread
for _
in range(num_threads)
196 if self.
num_entries != num_entries_per_thread * num_threads:
197 last_threads_portion = (
198 self.
num_entries - num_entries_per_thread * (num_threads - 1)
200 num_balanced_entries[-1] = last_threads_portion
202 arguments = zip(thread_index, num_balanced_entries)
204 with Pool(num_threads)
as pool:
209 Creates table details in the same format as expected 210 from pymapd's get_table_details 213 column.createColumnDetailsString()
for column
in self.
column_list 218 Verifies whether the existing table in the database has the expected 222 con = pymapd.connect(
230 raise Exception(
"Pymapd's connection to the server has failed.")
232 table_details = con.get_table_details(self.
table_name)
235 print(
"Table does not exist in the database")
239 str(table_detail)
for table_detail
in table_details
243 print(
"Schema does not match the expected one:")
245 "Observed table details: " 246 + str([str(table_detail)
for table_detail
in table_details])
249 "Expected table details: " 255 Verifies whether the existing table in the database has the expected 256 number of entries in it as in this class. 259 con = pymapd.connect(
266 result = con.execute(
267 "select count(*) from " + self.
table_name +
";" 272 print(
"Expected num rows did not match:")
275 raise Exception(
"Pymapd's connection to the server has failed.")
279 con = pymapd.connect(
287 con.execute(
"DROP TABLE IF EXISTS " + self.
table_name +
";")
291 raise Exception(
"Failure in creating a new table.")
295 con = pymapd.connect(
305 raise Exception(
"Failure in importing data into the table")
308 if __name__ ==
"__main__":
309 parser = ArgumentParser()
310 required = parser.add_argument_group(
"required arguments")
311 required.add_argument(
"--user", dest=
"user", default=
"admin")
312 required.add_argument(
313 "--password", dest=
"password", default=
"HyperInteractive" 315 required.add_argument(
316 "--table_name", dest=
"table_name", default=
"synthetic_test_table" 318 required.add_argument(
"--fragment_size", dest=
"fragment_size", default=
"1")
319 required.add_argument(
320 "--num_fragments", dest=
"num_fragments", default=
"128" 322 required.add_argument(
"--name", dest=
"name", default=
"omnisci")
323 required.add_argument(
"--server", dest=
"server", default=
"localhost")
324 required.add_argument(
"--port", dest=
"port", default=
"6274")
325 required.add_argument(
328 default=os.getcwd() +
"/../build/synthetic_data",
330 required.add_argument(
331 "--just_data_generation",
332 dest=
"just_data_generation",
334 help=
"Indicates that the code will only generates synthetic data, bypassing all " 335 +
"other capabilities. The generated data will be stored in DATA_DIR.",
337 required.add_argument(
338 "--just_data_import",
339 dest=
"just_data_import",
341 help=
"Indicates that the code assumes the data is generated and exists at data_dir/*.csv. " 342 +
"It then proceeds with table creation and data import. ",
344 args = parser.parse_args()
347 table_name=args.table_name,
348 fragment_size=int(args.fragment_size),
349 num_fragments=int(args.num_fragments),
352 db_password=args.password,
353 db_server=args.server,
354 db_port=int(args.port),
355 data_dir_path=args.data_dir,
356 is_remote_server=
False,
359 if (args.just_data_generation
is False)
and (
360 args.just_data_import
is False 362 synthetic_table.createDataAndImportTable()
363 elif args.just_data_generation
is True:
364 synthetic_table.generateDataParallel()
366 "Synthetic data created: " 367 + str(synthetic_table.num_entries)
371 synthetic_table.createDataAndImportTable(args.just_data_import)
int open(const char *path, int flags, int mode)
def createDataAndImportTable(self, skip_data_generation=False)
def __init__(self, column_name, sql_type, lower, upper, step=1)
def __init__(self, kwargs)
def importDataIntoTableInDB(self)
def createTableInDB(self)
def generateData(self, thread_idx, size)
def generateDataParallel(self)
def generateColumnsSchema(self)
def getCreateTableCommand(self)
def createColumnDetailsString(self)
def doesTableHasExpectedNumEntriesInDB(self)
def doesTableHasExpectedSchemaInDB(self)
def getCopyFromCommand(self)
def createExpectedTableDetails(self)