OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
18 import static com.mapd.calcite.parser.MapDParser.CURRENT_PARSER;
19 
24 import com.omnisci.thrift.calciteserver.CalciteServer;
25 import com.omnisci.thrift.calciteserver.InvalidParseRequest;
26 import com.omnisci.thrift.calciteserver.TAccessedQueryObjects;
27 import com.omnisci.thrift.calciteserver.TCompletionHint;
28 import com.omnisci.thrift.calciteserver.TCompletionHintType;
29 import com.omnisci.thrift.calciteserver.TExtArgumentType;
30 import com.omnisci.thrift.calciteserver.TFilterPushDownInfo;
31 import com.omnisci.thrift.calciteserver.TPlanResult;
32 import com.omnisci.thrift.calciteserver.TUserDefinedFunction;
33 import com.omnisci.thrift.calciteserver.TUserDefinedTableFunction;
34 
37 import org.apache.calcite.runtime.CalciteContextException;
38 import org.apache.calcite.sql.SqlNode;
39 import org.apache.calcite.sql.parser.SqlParseException;
40 import org.apache.calcite.sql.validate.SqlMoniker;
41 import org.apache.calcite.sql.validate.SqlMonikerType;
42 import org.apache.calcite.tools.RelConversionException;
43 import org.apache.calcite.tools.ValidationException;
44 import org.apache.calcite.util.Pair;
45 import org.apache.commons.pool.PoolableObjectFactory;
46 import org.apache.commons.pool.impl.GenericObjectPool;
47 import org.apache.thrift.TException;
48 import org.apache.thrift.server.TServer;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 
52 import java.io.IOException;
53 import java.util.ArrayList;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 
62 public class CalciteServerHandler implements CalciteServer.Iface {
63  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
64  private TServer server;
65 
66  private final int mapdPort;
67 
68  private volatile long callCount;
69 
70  private final GenericObjectPool parserPool;
71 
73 
74  private final String extSigsJson;
75 
76  private final String udfSigsJson;
77 
78  private String udfRTSigsJson = "";
79  Map<String, ExtensionFunction> udfRTSigs = null;
80 
82  private Map<String, ExtensionFunction> extSigs = null;
83  private String dataDir;
84 
85  // TODO MAT we need to merge this into common code base for these functions with
86  // CalciteDirect since we are not deprecating this stuff yet
88  String dataDir,
89  String extensionFunctionsAstFile,
91  String udfAstFile) {
92  this.mapdPort = mapdPort;
93  this.dataDir = dataDir;
94 
95  Map<String, ExtensionFunction> udfSigs = null;
96 
97  try {
98  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
99  } catch (IOException ex) {
100  MAPDLOGGER.error(
101  "Could not load extension function signatures: " + ex.getMessage(), ex);
102  }
103  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
104 
105  try {
106  if (!udfAstFile.isEmpty()) {
107  udfSigs = ExtensionFunctionSignatureParser.parseUdfAst(udfAstFile);
108  }
109  } catch (IOException ex) {
110  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage(), ex);
111  }
112  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
113 
114  // Put all the udf functions signatures in extSigs so Calcite has a view of
115  // extension functions and udf functions
116  if (!udfAstFile.isEmpty()) {
117  extSigs.putAll(udfSigs);
118  }
119 
120  calciteParserFactory = new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
121 
122  // GenericObjectPool::setFactory is deprecated
123  this.parserPool = new GenericObjectPool(calciteParserFactory);
124  }
125 
126  @Override
127  public void ping() throws TException {
128  MAPDLOGGER.debug("Ping hit");
129  }
130 
131  @Override
132  public TPlanResult process(String user,
133  String session,
134  String catalog,
135  String queryText,
136  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
137  boolean legacySyntax,
138  boolean isExplain,
139  boolean isViewOptimize) throws InvalidParseRequest, TException {
140  long timer = System.currentTimeMillis();
141  callCount++;
142 
144  try {
145  parser = (MapDParser) parserPool.borrowObject();
146  parser.clearMemo();
147  } catch (Exception ex) {
148  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
149  MAPDLOGGER.error(msg, ex);
150  throw new InvalidParseRequest(-1, msg);
151  }
152  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
153  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
154  + " sql: " + queryText);
155  parser.setUser(mapDUser);
156  CURRENT_PARSER.set(parser);
157 
158  // need to trim the sql string as it seems it is not trimed prior to here
159  boolean isRAQuery = false;
160 
161  if (queryText.startsWith("execute calcite")) {
162  queryText = queryText.replaceFirst("execute calcite", "");
163  isRAQuery = true;
164  }
165 
166  queryText = queryText.trim();
167  // remove last charcter if it is a ;
168  if (queryText.length() > 0 && queryText.charAt(queryText.length() - 1) == ';') {
169  queryText = queryText.substring(0, queryText.length() - 1);
170  }
171  String jsonResult;
172  SqlIdentifierCapturer capturer;
173  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
174  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
175  try {
176  if (!isRAQuery) {
177  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
178  new ArrayList<>();
179  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
180  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
181  req.input_prev, req.input_start, req.input_next));
182  }
183  Pair<String, SqlIdentifierCapturer> res;
184  SqlNode node;
185  try {
186  MapDParserOptions parserOptions = new MapDParserOptions(
187  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
188  res = parser.process(queryText, parserOptions);
189  jsonResult = res.left;
190  capturer = res.right;
191 
192  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
193  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
194  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
195  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
196 
197  // also resolve all the views in the select part
198  // resolution of the other parts is not
199  // necessary as these cannot be views
200  resolvedAccessedObjects.tables_selected_from =
201  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
202  resolvedAccessedObjects.tables_inserted_into =
203  new ArrayList<>(capturer.inserts);
204  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
205  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
206  } catch (ValidationException ex) {
207  String msg = "Validation: " + ex.getMessage();
208  MAPDLOGGER.error(msg, ex);
209  throw ex;
210  } catch (RelConversionException ex) {
211  String msg = " RelConversion failed: " + ex.getMessage();
212  MAPDLOGGER.error(msg, ex);
213  throw ex;
214  }
215  } else {
216  jsonResult = parser.optimizeRAQuery(queryText);
217  }
218  } catch (SqlParseException ex) {
219  String msg = "Parse failed: " + ex.getMessage();
220  MAPDLOGGER.error(msg, ex);
221  throw new InvalidParseRequest(-2, msg);
222  } catch (CalciteContextException ex) {
223  String msg = "Validate failed: " + ex.getMessage();
224  MAPDLOGGER.error(msg, ex);
225  throw new InvalidParseRequest(-3, msg);
226  } catch (Throwable ex) {
227  String msg = "Exception occurred: " + ex.getMessage();
228  MAPDLOGGER.error(msg, ex);
229  throw new InvalidParseRequest(-4, msg);
230  } finally {
231  CURRENT_PARSER.set(null);
232  try {
233  // put parser object back in pool for others to use
234  parserPool.returnObject(parser);
235  } catch (Exception ex) {
236  String msg = "Could not return parse object: " + ex.getMessage();
237  MAPDLOGGER.error(msg, ex);
238  throw new InvalidParseRequest(-4, msg);
239  }
240  }
241 
242  TPlanResult result = new TPlanResult();
243  result.primary_accessed_objects = primaryAccessedObjects;
244  result.resolved_accessed_objects = resolvedAccessedObjects;
245  result.plan_result = jsonResult;
246  result.execution_time_ms = System.currentTimeMillis() - timer;
247 
248  return result;
249  }
250 
251  @Override
252  public void shutdown() throws TException {
253  // received request to shutdown
254  MAPDLOGGER.debug("Shutdown calcite java server");
255  server.stop();
256  }
257 
258  @Override
260  return this.extSigsJson;
261  }
262 
263  @Override
265  return this.udfSigsJson;
266  }
267 
268  @Override
270  return this.udfRTSigsJson;
271  }
272 
273  void setServer(TServer s) {
274  server = s;
275  }
276 
277  @Override
278  public void updateMetadata(String catalog, String table) throws TException {
279  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
280  long timer = System.currentTimeMillis();
281  callCount++;
283  try {
284  parser = (MapDParser) parserPool.borrowObject();
285  } catch (Exception ex) {
286  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
287  MAPDLOGGER.error(msg, ex);
288  return;
289  }
290  CURRENT_PARSER.set(parser);
291  try {
292  parser.updateMetaData(catalog, table);
293  } finally {
294  CURRENT_PARSER.set(null);
295  try {
296  // put parser object back in pool for others to use
297  MAPDLOGGER.debug("Returning object to pool");
298  parserPool.returnObject(parser);
299  } catch (Exception ex) {
300  String msg = "Could not return parse object: " + ex.getMessage();
301  MAPDLOGGER.error(msg, ex);
302  }
303  }
304  }
305 
306  @Override
307  public List<TCompletionHint> getCompletionHints(String user,
308  String session,
309  String catalog,
310  List<String> visible_tables,
311  String sql,
312  int cursor) throws TException {
313  callCount++;
315  try {
316  parser = (MapDParser) parserPool.borrowObject();
317  } catch (Exception ex) {
318  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
319  MAPDLOGGER.error(msg, ex);
320  throw new TException(msg);
321  }
322  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
323  MAPDLOGGER.debug("getCompletionHints was called User: " + user
324  + " Catalog: " + catalog + " sql: " + sql);
325  parser.setUser(mapDUser);
326  CURRENT_PARSER.set(parser);
327 
328  MapDPlanner.CompletionResult completion_result;
329  try {
330  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
331  } catch (Exception ex) {
332  String msg = "Could not retrieve completion hints: " + ex.getMessage();
333  MAPDLOGGER.error(msg, ex);
334  return new ArrayList<>();
335  } finally {
336  CURRENT_PARSER.set(null);
337  try {
338  // put parser object back in pool for others to use
339  parserPool.returnObject(parser);
340  } catch (Exception ex) {
341  String msg = "Could not return parse object: " + ex.getMessage();
342  MAPDLOGGER.error(msg, ex);
343  throw new InvalidParseRequest(-4, msg);
344  }
345  }
346  List<TCompletionHint> result = new ArrayList<>();
347  for (final SqlMoniker hint : completion_result.hints) {
348  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
349  hint.getFullyQualifiedNames(),
350  completion_result.replaced));
351  }
352  return result;
353  }
354 
355  @Override
357  List<TUserDefinedFunction> udfs, List<TUserDefinedTableFunction> udtfs) {
358  // Clean up previously defined Runtime UDFs
359  if (udfRTSigs != null) {
360  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
361  udfRTSigsJson = "";
362  udfRTSigs.clear();
363  } else {
364  udfRTSigs = new HashMap<String, ExtensionFunction>();
365  }
366 
367  for (TUserDefinedFunction udf : udfs) {
368  udfRTSigs.put(udf.name, toExtensionFunction(udf));
369  }
370 
371  for (TUserDefinedTableFunction udtf : udtfs) {
372  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
373  }
374 
375  // Avoid overwritting compiled and Loadtime UDFs:
376  for (String name : udfRTSigs.keySet()) {
377  if (extSigs.containsKey(name)) {
378  MAPDLOGGER.error("Extension function `" + name
379  + "` exists. Skipping runtime extenension function with the same name.");
380  udfRTSigs.remove(name);
381  }
382  }
383 
384  // udfRTSigsJson will contain only the signatures of UDFs:
385  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
386  // Expose RT UDFs to Calcite server:
387  extSigs.putAll(udfRTSigs);
388 
389  calciteParserFactory.updateOperatorTable();
390  }
391 
392  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
393  List<ExtensionFunction.ExtArgumentType> args =
394  new ArrayList<ExtensionFunction.ExtArgumentType>();
395  for (TExtArgumentType atype : udf.argTypes) {
396  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
397  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
398  args.add(arg_type);
399  }
400  }
401  return new ExtensionFunction(args, toExtArgumentType(udf.retType));
402  }
403 
404  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
405  List<ExtensionFunction.ExtArgumentType> args =
406  new ArrayList<ExtensionFunction.ExtArgumentType>();
407  for (TExtArgumentType atype : udtf.sqlArgTypes) {
408  args.add(toExtArgumentType(atype));
409  }
410  List<ExtensionFunction.ExtArgumentType> outs =
411  new ArrayList<ExtensionFunction.ExtArgumentType>();
412  for (TExtArgumentType otype : udtf.outputArgTypes) {
413  outs.add(toExtArgumentType(otype));
414  }
415  return new ExtensionFunction(args, outs);
416  }
417 
418  private static ExtensionFunction.ExtArgumentType toExtArgumentType(
419  TExtArgumentType type) {
420  switch (type) {
421  case Int8:
422  return ExtensionFunction.ExtArgumentType.Int8;
423  case Int16:
424  return ExtensionFunction.ExtArgumentType.Int16;
425  case Int32:
426  return ExtensionFunction.ExtArgumentType.Int32;
427  case Int64:
428  return ExtensionFunction.ExtArgumentType.Int64;
429  case Float:
430  return ExtensionFunction.ExtArgumentType.Float;
431  case Double:
433  case Void:
434  return ExtensionFunction.ExtArgumentType.Void;
435  case PInt8:
436  return ExtensionFunction.ExtArgumentType.PInt8;
437  case PInt16:
438  return ExtensionFunction.ExtArgumentType.PInt16;
439  case PInt32:
440  return ExtensionFunction.ExtArgumentType.PInt32;
441  case PInt64:
442  return ExtensionFunction.ExtArgumentType.PInt64;
443  case PFloat:
444  return ExtensionFunction.ExtArgumentType.PFloat;
445  case PDouble:
446  return ExtensionFunction.ExtArgumentType.PDouble;
447  case PBool:
448  return ExtensionFunction.ExtArgumentType.PBool;
449  case Bool:
450  return ExtensionFunction.ExtArgumentType.Bool;
451  case ArrayInt8:
452  return ExtensionFunction.ExtArgumentType.ArrayInt8;
453  case ArrayInt16:
454  return ExtensionFunction.ExtArgumentType.ArrayInt16;
455  case ArrayInt32:
456  return ExtensionFunction.ExtArgumentType.ArrayInt32;
457  case ArrayInt64:
458  return ExtensionFunction.ExtArgumentType.ArrayInt64;
459  case ArrayFloat:
460  return ExtensionFunction.ExtArgumentType.ArrayFloat;
461  case ArrayDouble:
462  return ExtensionFunction.ExtArgumentType.ArrayDouble;
463  case ArrayBool:
464  return ExtensionFunction.ExtArgumentType.ArrayBool;
465  case ColumnInt8:
466  return ExtensionFunction.ExtArgumentType.ColumnInt8;
467  case ColumnInt16:
468  return ExtensionFunction.ExtArgumentType.ColumnInt16;
469  case ColumnInt32:
470  return ExtensionFunction.ExtArgumentType.ColumnInt32;
471  case ColumnInt64:
472  return ExtensionFunction.ExtArgumentType.ColumnInt64;
473  case ColumnFloat:
474  return ExtensionFunction.ExtArgumentType.ColumnFloat;
475  case ColumnDouble:
476  return ExtensionFunction.ExtArgumentType.ColumnDouble;
477  case ColumnBool:
478  return ExtensionFunction.ExtArgumentType.ColumnBool;
479  case GeoPoint:
480  return ExtensionFunction.ExtArgumentType.GeoPoint;
481  case GeoLineString:
482  return ExtensionFunction.ExtArgumentType.GeoLineString;
483  case Cursor:
484  return ExtensionFunction.ExtArgumentType.Cursor;
485  case GeoPolygon:
486  return ExtensionFunction.ExtArgumentType.GeoPolygon;
487  case GeoMultiPolygon:
488  return ExtensionFunction.ExtArgumentType.GeoMultiPolygon;
489  default:
490  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
491  return null;
492  }
493  }
494 
495  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
496  switch (type) {
497  case COLUMN:
498  return TCompletionHintType.COLUMN;
499  case TABLE:
500  return TCompletionHintType.TABLE;
501  case VIEW:
502  return TCompletionHintType.VIEW;
503  case SCHEMA:
504  return TCompletionHintType.SCHEMA;
505  case CATALOG:
506  return TCompletionHintType.CATALOG;
507  case REPOSITORY:
508  return TCompletionHintType.REPOSITORY;
509  case FUNCTION:
510  return TCompletionHintType.FUNCTION;
511  case KEYWORD:
512  return TCompletionHintType.KEYWORD;
513  default:
514  return null;
515  }
516  }
517 }
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
Map< String, ExtensionFunction > udfRTSigs
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
TPlanResult process(String user, String session, String catalog, String queryText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
string name
Definition: setup.py:35
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)