OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LogRunner.java
Go to the documentation of this file.
1 /*
2  * Copyright 2015 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.logrunner;
17 
18 import com.omnisci.thrift.server.OmniSci;
19 import com.omnisci.thrift.server.TColumn;
20 import com.omnisci.thrift.server.TColumnData;
21 import com.omnisci.thrift.server.TColumnType;
22 import com.omnisci.thrift.server.TDBInfo;
23 import com.omnisci.thrift.server.TDatum;
24 import com.omnisci.thrift.server.TExecuteMode;
25 import com.omnisci.thrift.server.TOmniSciException;
26 import com.omnisci.thrift.server.TPixel;
27 import com.omnisci.thrift.server.TQueryResult;
28 import com.omnisci.thrift.server.TRenderResult;
29 import com.omnisci.thrift.server.TRow;
30 import com.omnisci.thrift.server.TRowSet;
31 import com.omnisci.thrift.server.TTableDetails;
32 
33 import org.apache.thrift.TException;
34 import org.apache.thrift.protocol.TJSONProtocol;
35 import org.apache.thrift.protocol.TProtocol;
36 import org.apache.thrift.transport.THttpClient;
37 import org.apache.thrift.transport.TTransport;
38 import org.apache.thrift.transport.TTransportException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 
42 import java.io.BufferedReader;
43 import java.io.FileNotFoundException;
44 import java.io.FileOutputStream;
45 import java.io.FileReader;
46 import java.io.IOException;
47 import java.util.ArrayList;
48 import java.util.HashMap;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.concurrent.ArrayBlockingQueue;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors;
54 import java.util.concurrent.ThreadPoolExecutor;
55 import java.util.concurrent.TimeUnit;
56 import java.util.logging.Level;
57 
62 public class LogRunner {
63  final static Logger logger = LoggerFactory.getLogger(LogRunner.class);
64  private HashMap<Integer, String> sqlquery;
65  private HashMap<Integer, String> originalSql;
66  private HashMap<Integer, String> json;
67  private boolean gpuMode = false;
68  private boolean cpuMode = false;
69 
70  public static void main(String[] args) throws TException {
71  logger.info("Hello, World");
72 
73  LogRunner x = new LogRunner();
74  try {
75  x.doWork(args);
76  } catch (TTransportException ex) {
77  logger.error(ex.toString());
78  ex.printStackTrace();
79  }
80  }
81 
82  public LogRunner() {
83  sqlquery = new HashMap<Integer, String>();
84  originalSql = new HashMap<Integer, String>();
85  json = new HashMap<Integer, String>();
86  }
87 
88  void doWork(String[] args) throws TTransportException, TException {
89  logger.info("In doWork here");
90 
91  int numberThreads = 3;
92  // Runnable[] worker = new Runnable[numberThreads];
93  //
94  // for (int i = 0; i < numberThreads; i++){
95  //
96  OmniSci.Client client = getClient(args[0], Integer.valueOf(args[1]));
97  String session = getSession(client);
98  // worker[i] = new myThread(client, session);
99  // }
100 
101  logger.info("got session");
102  try {
103  // ExecutorService executor = Executors.newFixedThreadPool(6);
104  ExecutorService executor = new ThreadPoolExecutor(numberThreads,
105  numberThreads,
106  0L,
107  TimeUnit.MILLISECONDS,
108  new ArrayBlockingQueue<Runnable>(15),
109  new ThreadPoolExecutor.CallerRunsPolicy());
110  while (true) {
111  // BufferedReader in = new BufferedReader(new FileReader("/data/logfiles/log1"));
112  BufferedReader in = new BufferedReader(new FileReader(args[2]));
113  String str;
114  int current = 0;
115  while ((str = in.readLine()) != null) {
116  Runnable worker = new myThread(str, client, session);
117  // executor.execute(worker);
118  worker.run();
119  }
120  in.close();
121  logger.info("############loop complete");
122  }
123  // executor.shutdown();
124  } catch (IOException e) {
125  logger.error("IOException " + e.getMessage());
126  }
127  }
128 
129  private OmniSci.Client getClient(String hostname, int port) throws TTransportException {
130  TTransport transport = null;
131 
132  // transport = new TSocket("localhost", 6274);
133  transport = new THttpClient("http://" + hostname + ":" + port);
134 
135  transport.open();
136 
137  // TProtocol protocol = new TBinaryProtocol(transport);
138  TProtocol protocol = new TJSONProtocol(transport);
139  // TProtocol protocol = new TProtocol(transport);
140 
141  return new OmniSci.Client(protocol);
142  }
143 
144  private String getSession(OmniSci.Client client)
145  throws TTransportException, TOmniSciException, TException {
146  String session = client.connect("mapd", "HyperInteractive", "mapd");
147  logger.info("Connected session is " + session);
148  return session;
149  }
150 
151  private void closeSession(OmniSci.Client client, String session)
152  throws TOmniSciException, TException {
153  // Now disconnect
154  logger.info("Trying to disconnect session " + session);
155  client.disconnect(session);
156  }
157 
158  private void theRest(OmniSci.Client client, String session) throws TException {
159  // lets fetch databases from mapd
160  List<TDBInfo> dbs = client.get_databases(session);
161 
162  for (TDBInfo db : dbs) {
163  logger.info("db is " + db.toString());
164  }
165 
166  // lets fetch tables from mapd
167  List<String> tables = client.get_tables(session);
168 
169  for (String tab : tables) {
170  logger.info("Tables is " + tab);
171  }
172 
173  // lets get the version
174  logger.info("Version " + client.get_version());
175 
176  // get table_details
177  TTableDetails table_details = client.get_table_details(session, "flights");
178  for (TColumnType col : table_details.row_desc) {
179  logger.info("col name :" + col.col_name);
180  logger.info("\tcol encoding :" + col.col_type.encoding);
181  logger.info("\tcol is_array :" + col.col_type.is_array);
182  logger.info("\tcol nullable :" + col.col_type.nullable);
183  logger.info("\tcol type :" + col.col_type.type);
184  }
185 
186  // client.set_execution_mode(session, TExecuteMode.CPU);
187  logger.info(" -- before query -- ");
188 
189  TQueryResult sql_execute = client.sql_execute(session,
190  "Select uniquecarrier,flightnum from flights LIMIT 3;",
191  true,
192  null,
193  -1,
194  -1);
195  // client.send_sql_execute(session, "Select BRAND from ACV ;", true);
196  // logger.info(" -- before query recv -- ");
197  // TQueryResult sql_execute = client.recv_sql_execute();
198 
199  logger.info(" -- after query -- ");
200 
201  logger.info("TQueryResult execution time is " + sql_execute.getExecution_time_ms());
202  logger.info("TQueryResult is " + sql_execute.toString());
203  logger.info("TQueryResult getFieldValue is "
204  + sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET));
205 
206  TRowSet row_set = sql_execute.getRow_set();
207  Object fieldValue = sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET);
208 
209  logger.info("fieldValue " + fieldValue);
210 
211  logger.info("TRowSet is " + row_set.toString());
212 
213  logger.info("Get rows size " + row_set.getRowsSize());
214  logger.info("Get col size " + row_set.getRowsSize());
215 
216  List<TRow> rows = row_set.getRows();
217  int count = 1;
218  for (TRow row : rows) {
219  List<TDatum> cols = row.getCols();
220  if (cols != null) {
221  for (TDatum dat : cols) {
222  logger.info("ROW " + count + " " + dat.getFieldValue(TDatum._Fields.VAL));
223  }
224  count++;
225  }
226  }
227 
228  List<TColumn> columns = row_set.getColumns();
229 
230  logger.info("columns " + columns);
231  count = 1;
232  for (TColumn col : columns) {
233  TColumnData data = col.getData();
234  if (data != null) {
235  logger.info("COL " + count + " " + data.toString());
236  }
237  count++;
238  }
239  }
240 
241  public class myThread implements Runnable {
242  private String str;
243  private OmniSci.Client client;
244  private String session;
245 
246  myThread(String str1, OmniSci.Client client1, String session1) {
247  str = str1;
248  client = client1;
249  session = session1;
250  }
251 
252  @Override
253  public void run() {
254  int logStart = str.indexOf(']');
255  if (logStart != -1) {
256  String det = str.substring(logStart + 1).trim();
257  String header = str.substring(0, logStart).trim();
258 
259  String[] headDet = header.split(" .");
260  // logger.info("header "+ header + " count " + headDet.length + " detail " + det
261  // );
262  if (headDet.length != 4 || headDet[0].equals("Log")) {
263  return;
264  }
265  Integer pid = Integer.valueOf(headDet[2]);
266 
267  if (det.contains("sql_execute :")) {
268  logger.info("det " + det);
269  String sl[] = det.split(":query_str:");
270  logger.info("run query " + sl[1]);
271  try {
272  client.sql_execute(session, sl[1], true, null, -1, -1);
273  } catch (TOmniSciException ex1) {
274  logger.error(
275  "Failed to execute " + sl[1] + " exception " + ex1.getError_msg());
276  } catch (TException ex) {
277  logger.error("Failed to execute " + sl[1] + " exception " + ex.toString());
278  }
279  return;
280  }
281 
282  // get_result_row_for_pixel
283  // :5pFFQUCKs17GLHOqI7ykK09U8mX7GnLF:widget_id:3:pixel.x:396:pixel.y:53:column_format:1
284  //:PixelRadius:2:table_col_names::points,dest,conv_4326_900913_x(dest_lon) as
285  // x,conv_4326_900913_y(dest_lat) as y,arrdelay as size
286  if (det.contains("get_result_row_for_pixel :")) {
287  logger.info("det " + det);
288  String ss[] = det.split(":");
289  String sl[] = det.split(":table_col_names:");
290  logger.info("run get_result_for_pixel " + sl[1]);
291  Map<String, List<String>> tcn = new HashMap<String, List<String>>();
292 
293  String tn[] = sl[1].split(":");
294  for (int i = 0; i < tn.length; i++) {
295  String name[] = tn[i].split(",");
296  List<String> col = new ArrayList<String>();
297  for (int j = 1; j < name.length; j++) {
298  col.add(name[j]);
299  }
300  tcn.put(name[0], col);
301  }
302  try {
303  client.get_result_row_for_pixel(session,
304  Integer.parseInt(ss[3]),
305  new TPixel(Integer.parseInt(ss[5]), Integer.parseInt(ss[7])),
306  tcn,
307  Boolean.TRUE,
308  Integer.parseInt(ss[11]),
309  null);
310  } catch (TOmniSciException ex1) {
311  logger.error("Failed to execute get_result_row_for_pixel exception "
312  + ex1.getError_msg());
313  } catch (TException ex) {
314  logger.error("Failed to execute get_result_row_for_pixel exception "
315  + ex.toString());
316  }
317  return;
318  }
319 
320  if (det.contains("render_vega :")) {
321  logger.info("det " + det);
322  String ss[] = det.split(":");
323  String sl[] = det.split(":vega_json:");
324  json.put(pid, det.substring(det.indexOf("render_vega :") + 13, det.length()));
325  logger.info("JSON = " + sl[1]);
326  logger.info("widget = " + Integer.parseInt(ss[3]));
327  logger.info("compressionLevel = " + Integer.parseInt(ss[5]));
328  logger.info("run render_vega");
329  if (cpuMode) {
330  logger.info("In render: setting gpu mode as we were in CPU mode");
331  gpuMode = true;
332  cpuMode = false;
333  try {
334  client.set_execution_mode(session, TExecuteMode.GPU);
335  } catch (TException ex) {
336  logger.error("Failed to set_execution_mode exception " + ex.toString());
337  }
338  }
339  try {
340  TRenderResult fred = client.render_vega(session,
341  Integer.parseInt(ss[3]),
342  sl[1],
343  Integer.parseInt(ss[5]),
344  null);
345  if (false) {
346  try {
347  FileOutputStream fos;
348 
349  fos = new FileOutputStream("/tmp/png.png");
350 
351  fred.image.position(0);
352  byte[] tgxImageDataByte = new byte[fred.image.limit()];
353  fred.image.get(tgxImageDataByte);
354  fos.write(tgxImageDataByte);
355  fos.close();
356  } catch (FileNotFoundException ex) {
357  logger.error("Failed to create file exception " + ex.toString());
358  } catch (IOException ex) {
359  logger.error("Failed to create file exception " + ex.toString());
360  }
361  }
362 
363  } catch (TException ex) {
364  logger.error("Failed to execute render_vega exception " + ex.toString());
365  }
366  return;
367  }
368 
369  if (det.contains("User mapd sets CPU mode")) {
370  logger.info("Set cpu mode");
371  cpuMode = true;
372  gpuMode = false;
373  try {
374  client.set_execution_mode(session, TExecuteMode.CPU);
375  } catch (TException ex) {
376  logger.error("Failed to set_execution_mode exception " + ex.toString());
377  }
378  return;
379  }
380 
381  if (det.contains("User mapd sets GPU mode")) {
382  logger.info("Set gpu mode");
383  gpuMode = true;
384  cpuMode = false;
385  try {
386  client.set_execution_mode(session, TExecuteMode.GPU);
387  } catch (TException ex) {
388  logger.error(
389  "Failed to execute set_execution_mode exception " + ex.toString());
390  }
391  return;
392  }
393  }
394  }
395  }
396 }
string name
Definition: setup.in.py:72
HashMap< Integer, String > sqlquery
Definition: LogRunner.java:64
String getSession(OmniSci.Client client)
Definition: LogRunner.java:144
void theRest(OmniSci.Client client, String session)
Definition: LogRunner.java:158
void doWork(String[] args)
Definition: LogRunner.java:88
myThread(String str1, OmniSci.Client client1, String session1)
Definition: LogRunner.java:246
int count
OmniSci.Client getClient(String hostname, int port)
Definition: LogRunner.java:129
HashMap< Integer, String > originalSql
Definition: LogRunner.java:65
static final Logger logger
Definition: LogRunner.java:63
HashMap< Integer, String > json
Definition: LogRunner.java:66
static void main(String[] args)
Definition: LogRunner.java:70
void closeSession(OmniSci.Client client, String session)
Definition: LogRunner.java:151