OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LogRunner.java
Go to the documentation of this file.
1 /*
2  * Copyright 2015 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.logrunner;
17 
18 import org.apache.thrift.TException;
19 import org.apache.thrift.protocol.TJSONProtocol;
20 import org.apache.thrift.protocol.TProtocol;
21 import org.apache.thrift.transport.THttpClient;
22 import org.apache.thrift.transport.TTransport;
23 import org.apache.thrift.transport.TTransportException;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 
27 import java.io.BufferedReader;
28 import java.io.FileNotFoundException;
29 import java.io.FileOutputStream;
30 import java.io.FileReader;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ArrayBlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.logging.Level;
42 
43 import ai.heavy.thrift.server.Heavy;
44 import ai.heavy.thrift.server.TColumn;
45 import ai.heavy.thrift.server.TColumnData;
46 import ai.heavy.thrift.server.TColumnType;
47 import ai.heavy.thrift.server.TDBException;
48 import ai.heavy.thrift.server.TDBInfo;
49 import ai.heavy.thrift.server.TDatum;
50 import ai.heavy.thrift.server.TExecuteMode;
51 import ai.heavy.thrift.server.TPixel;
52 import ai.heavy.thrift.server.TQueryResult;
53 import ai.heavy.thrift.server.TRenderResult;
54 import ai.heavy.thrift.server.TRow;
55 import ai.heavy.thrift.server.TRowSet;
56 import ai.heavy.thrift.server.TTableDetails;
57 
58 public class LogRunner {
59  final static Logger logger = LoggerFactory.getLogger(LogRunner.class);
60  private HashMap<Integer, String> sqlquery;
61  private HashMap<Integer, String> originalSql;
62  private HashMap<Integer, String> json;
63  private boolean gpuMode = false;
64  private boolean cpuMode = false;
65 
66  public static void main(String[] args) throws TException {
67  logger.info("Hello, World");
68 
69  LogRunner x = new LogRunner();
70  try {
71  x.doWork(args);
72  } catch (TTransportException ex) {
73  logger.error(ex.toString());
74  ex.printStackTrace();
75  }
76  }
77 
78  public LogRunner() {
79  sqlquery = new HashMap<Integer, String>();
80  originalSql = new HashMap<Integer, String>();
81  json = new HashMap<Integer, String>();
82  }
83 
84  void doWork(String[] args) throws TTransportException, TException {
85  logger.info("In doWork here");
86 
87  int numberThreads = 3;
88  // Runnable[] worker = new Runnable[numberThreads];
89  //
90  // for (int i = 0; i < numberThreads; i++){
91  //
92  Heavy.Client client = getClient(args[0], Integer.valueOf(args[1]));
93  String session = getSession(client);
94  // worker[i] = new myThread(client, session);
95  // }
96 
97  logger.info("got session");
98  try {
99  // ExecutorService executor = Executors.newFixedThreadPool(6);
100  ExecutorService executor = new ThreadPoolExecutor(numberThreads,
101  numberThreads,
102  0L,
103  TimeUnit.MILLISECONDS,
104  new ArrayBlockingQueue<Runnable>(15),
105  new ThreadPoolExecutor.CallerRunsPolicy());
106  while (true) {
107  // BufferedReader in = new BufferedReader(new
108  // FileReader("/data/logfiles/log1"));
109  BufferedReader in = new BufferedReader(new FileReader(args[2]));
110  String str;
111  int current = 0;
112  while ((str = in.readLine()) != null) {
113  Runnable worker = new myThread(str, client, session);
114  // executor.execute(worker);
115  worker.run();
116  }
117  in.close();
118  logger.info("############loop complete");
119  }
120  // executor.shutdown();
121  } catch (IOException e) {
122  logger.error("IOException " + e.getMessage());
123  }
124  }
125 
126  private Heavy.Client getClient(String hostname, int port) throws TTransportException {
127  TTransport transport = null;
128 
129  // transport = new TSocket("localhost", 6274);
130  transport = new THttpClient("http://" + hostname + ":" + port);
131 
132  transport.open();
133 
134  // TProtocol protocol = new TBinaryProtocol(transport);
135  TProtocol protocol = new TJSONProtocol(transport);
136  // TProtocol protocol = new TProtocol(transport);
137 
138  return new Heavy.Client(protocol);
139  }
140 
141  private String getSession(Heavy.Client client)
142  throws TTransportException, TDBException, TException {
143  String session = client.connect("mapd", "HyperInteractive", "mapd");
144  logger.info("Connected session is " + session);
145  return session;
146  }
147 
148  private void closeSession(Heavy.Client client, String session)
149  throws TDBException, TException {
150  // Now disconnect
151  logger.info("Trying to disconnect session " + session);
152  client.disconnect(session);
153  }
154 
155  private void theRest(Heavy.Client client, String session) throws TException {
156  // lets fetch databases from mapd
157  List<TDBInfo> dbs = client.get_databases(session);
158 
159  for (TDBInfo db : dbs) {
160  logger.info("db is " + db.toString());
161  }
162 
163  // lets fetch tables from mapd
164  List<String> tables = client.get_tables(session);
165 
166  for (String tab : tables) {
167  logger.info("Tables is " + tab);
168  }
169 
170  // lets get the version
171  logger.info("Version " + client.get_version());
172 
173  // get table_details
174  TTableDetails table_details = client.get_table_details(session, "flights");
175  for (TColumnType col : table_details.row_desc) {
176  logger.info("col name :" + col.col_name);
177  logger.info("\tcol encoding :" + col.col_type.encoding);
178  logger.info("\tcol is_array :" + col.col_type.is_array);
179  logger.info("\tcol nullable :" + col.col_type.nullable);
180  logger.info("\tcol type :" + col.col_type.type);
181  }
182 
183  // client.set_execution_mode(session, TExecuteMode.CPU);
184  logger.info(" -- before query -- ");
185 
186  TQueryResult sql_execute = client.sql_execute(session,
187  "Select uniquecarrier,flightnum from flights LIMIT 3;",
188  true,
189  null,
190  -1,
191  -1);
192  // client.send_sql_execute(session, "Select BRAND from ACV ;", true);
193  // logger.info(" -- before query recv -- ");
194  // TQueryResult sql_execute = client.recv_sql_execute();
195 
196  logger.info(" -- after query -- ");
197 
198  logger.info("TQueryResult execution time is " + sql_execute.getExecution_time_ms());
199  logger.info("TQueryResult is " + sql_execute.toString());
200  logger.info("TQueryResult getFieldValue is "
201  + sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET));
202 
203  TRowSet row_set = sql_execute.getRow_set();
204  Object fieldValue = sql_execute.getFieldValue(TQueryResult._Fields.ROW_SET);
205 
206  logger.info("fieldValue " + fieldValue);
207 
208  logger.info("TRowSet is " + row_set.toString());
209 
210  logger.info("Get rows size " + row_set.getRowsSize());
211  logger.info("Get col size " + row_set.getRowsSize());
212 
213  List<TRow> rows = row_set.getRows();
214  int count = 1;
215  for (TRow row : rows) {
216  List<TDatum> cols = row.getCols();
217  if (cols != null) {
218  for (TDatum dat : cols) {
219  logger.info("ROW " + count + " " + dat.getFieldValue(TDatum._Fields.VAL));
220  }
221  count++;
222  }
223  }
224 
225  List<TColumn> columns = row_set.getColumns();
226 
227  logger.info("columns " + columns);
228  count = 1;
229  for (TColumn col : columns) {
230  TColumnData data = col.getData();
231  if (data != null) {
232  logger.info("COL " + count + " " + data.toString());
233  }
234  count++;
235  }
236  }
237 
238  public class myThread implements Runnable {
239  private String str;
240  private Heavy.Client client;
241  private String session;
242 
243  myThread(String str1, Heavy.Client client1, String session1) {
244  str = str1;
245  client = client1;
246  session = session1;
247  }
248 
249  @Override
250  public void run() {
251  int logStart = str.indexOf(']');
252  if (logStart != -1) {
253  String det = str.substring(logStart + 1).trim();
254  String header = str.substring(0, logStart).trim();
255 
256  String[] headDet = header.split(" .");
257  // logger.info("header "+ header + " count " + headDet.length + " detail " + det
258  // );
259  if (headDet.length != 4 || headDet[0].equals("Log")) {
260  return;
261  }
262  Integer pid = Integer.valueOf(headDet[2]);
263 
264  if (det.contains("sql_execute :")) {
265  logger.info("det " + det);
266  String sl[] = det.split(":query_str:");
267  logger.info("run query " + sl[1]);
268  try {
269  client.sql_execute(session, sl[1], true, null, -1, -1);
270  } catch (TDBException ex1) {
271  logger.error(
272  "Failed to execute " + sl[1] + " exception " + ex1.getError_msg());
273  } catch (TException ex) {
274  logger.error("Failed to execute " + sl[1] + " exception " + ex.toString());
275  }
276  return;
277  }
278 
279  // get_result_row_for_pixel
280  // :5pFFQUCKs17GLHOqI7ykK09U8mX7GnLF:widget_id:3:pixel.x:396:pixel.y:53:column_format:1
281  // :PixelRadius:2:table_col_names::points,dest,conv_4326_900913_x(dest_lon) as
282  // x,conv_4326_900913_y(dest_lat) as y,arrdelay as size
283  if (det.contains("get_result_row_for_pixel :")) {
284  logger.info("det " + det);
285  String ss[] = det.split(":");
286  String sl[] = det.split(":table_col_names:");
287  logger.info("run get_result_for_pixel " + sl[1]);
288  Map<String, List<String>> tcn = new HashMap<String, List<String>>();
289 
290  String tn[] = sl[1].split(":");
291  for (int i = 0; i < tn.length; i++) {
292  String name[] = tn[i].split(",");
293  List<String> col = new ArrayList<String>();
294  for (int j = 1; j < name.length; j++) {
295  col.add(name[j]);
296  }
297  tcn.put(name[0], col);
298  }
299  try {
300  client.get_result_row_for_pixel(session,
301  Integer.parseInt(ss[3]),
302  new TPixel(Integer.parseInt(ss[5]), Integer.parseInt(ss[7])),
303  tcn,
304  Boolean.TRUE,
305  Integer.parseInt(ss[11]),
306  null);
307  } catch (TDBException ex1) {
308  logger.error("Failed to execute get_result_row_for_pixel exception "
309  + ex1.getError_msg());
310  } catch (TException ex) {
311  logger.error("Failed to execute get_result_row_for_pixel exception "
312  + ex.toString());
313  }
314  return;
315  }
316 
317  if (det.contains("render_vega :")) {
318  logger.info("det " + det);
319  String ss[] = det.split(":");
320  String sl[] = det.split(":vega_json:");
321  json.put(pid, det.substring(det.indexOf("render_vega :") + 13, det.length()));
322  logger.info("JSON = " + sl[1]);
323  logger.info("widget = " + Integer.parseInt(ss[3]));
324  logger.info("compressionLevel = " + Integer.parseInt(ss[5]));
325  logger.info("run render_vega");
326  if (cpuMode) {
327  logger.info("In render: setting gpu mode as we were in CPU mode");
328  gpuMode = true;
329  cpuMode = false;
330  try {
331  client.set_execution_mode(session, TExecuteMode.GPU);
332  } catch (TException ex) {
333  logger.error("Failed to set_execution_mode exception " + ex.toString());
334  }
335  }
336  try {
337  TRenderResult fred = client.render_vega(session,
338  Integer.parseInt(ss[3]),
339  sl[1],
340  Integer.parseInt(ss[5]),
341  null);
342  if (false) {
343  try {
344  FileOutputStream fos;
345 
346  fos = new FileOutputStream("/tmp/png.png");
347 
348  fred.image.position(0);
349  byte[] tgxImageDataByte = new byte[fred.image.limit()];
350  fred.image.get(tgxImageDataByte);
351  fos.write(tgxImageDataByte);
352  fos.close();
353  } catch (FileNotFoundException ex) {
354  logger.error("Failed to create file exception " + ex.toString());
355  } catch (IOException ex) {
356  logger.error("Failed to create file exception " + ex.toString());
357  }
358  }
359 
360  } catch (TException ex) {
361  logger.error("Failed to execute render_vega exception " + ex.toString());
362  }
363  return;
364  }
365 
366  if (det.contains("User mapd sets CPU mode")) {
367  logger.info("Set cpu mode");
368  cpuMode = true;
369  gpuMode = false;
370  try {
371  client.set_execution_mode(session, TExecuteMode.CPU);
372  } catch (TException ex) {
373  logger.error("Failed to set_execution_mode exception " + ex.toString());
374  }
375  return;
376  }
377 
378  if (det.contains("User mapd sets GPU mode")) {
379  logger.info("Set gpu mode");
380  gpuMode = true;
381  cpuMode = false;
382  try {
383  client.set_execution_mode(session, TExecuteMode.GPU);
384  } catch (TException ex) {
385  logger.error(
386  "Failed to execute set_execution_mode exception " + ex.toString());
387  }
388  return;
389  }
390  }
391  }
392  }
393 }
myThread(String str1, Heavy.Client client1, String session1)
Definition: LogRunner.java:243
HashMap< Integer, String > sqlquery
Definition: LogRunner.java:60
void doWork(String[] args)
Definition: LogRunner.java:84
void theRest(Heavy.Client client, String session)
Definition: LogRunner.java:155
HashMap< Integer, String > originalSql
Definition: LogRunner.java:61
static final Logger logger
Definition: LogRunner.java:59
HashMap< Integer, String > json
Definition: LogRunner.java:62
String getSession(Heavy.Client client)
Definition: LogRunner.java:141
string name
Definition: setup.in.py:72
static void main(String[] args)
Definition: LogRunner.java:66
void closeSession(Heavy.Client client, String session)
Definition: LogRunner.java:148
Heavy.Client getClient(String hostname, int port)
Definition: LogRunner.java:126