OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
18 import static com.mapd.calcite.parser.MapDParser.CURRENT_PARSER;
19 
24 import com.omnisci.thrift.calciteserver.CalciteServer;
25 import com.omnisci.thrift.calciteserver.InvalidParseRequest;
26 import com.omnisci.thrift.calciteserver.TAccessedQueryObjects;
27 import com.omnisci.thrift.calciteserver.TCompletionHint;
28 import com.omnisci.thrift.calciteserver.TCompletionHintType;
29 import com.omnisci.thrift.calciteserver.TExtArgumentType;
30 import com.omnisci.thrift.calciteserver.TFilterPushDownInfo;
31 import com.omnisci.thrift.calciteserver.TPlanResult;
32 import com.omnisci.thrift.calciteserver.TRestriction;
33 import com.omnisci.thrift.calciteserver.TUserDefinedFunction;
34 import com.omnisci.thrift.calciteserver.TUserDefinedTableFunction;
35 
39 import org.apache.calcite.runtime.CalciteContextException;
40 import org.apache.calcite.sql.SqlNode;
41 import org.apache.calcite.sql.parser.SqlParseException;
42 import org.apache.calcite.sql.validate.SqlMoniker;
43 import org.apache.calcite.sql.validate.SqlMonikerType;
44 import org.apache.calcite.tools.RelConversionException;
45 import org.apache.calcite.tools.ValidationException;
46 import org.apache.calcite.util.Pair;
47 import org.apache.commons.pool.PoolableObjectFactory;
48 import org.apache.commons.pool.impl.GenericObjectPool;
49 import org.apache.thrift.TException;
50 import org.apache.thrift.server.TServer;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 
54 import java.io.IOException;
55 import java.util.ArrayList;
56 import java.util.HashMap;
57 import java.util.List;
58 import java.util.Map;
59 
64 public class CalciteServerHandler implements CalciteServer.Iface {
65  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
66  private TServer server;
67 
68  private final int mapdPort;
69 
70  private volatile long callCount;
71 
72  private final GenericObjectPool parserPool;
73 
75 
76  private final String extSigsJson;
77 
78  private final String udfSigsJson;
79 
80  private String udfRTSigsJson = "";
81  Map<String, ExtensionFunction> udfRTSigs = null;
82 
83  Map<String, ExtensionFunction> udtfSigs = null;
84 
86  private Map<String, ExtensionFunction> extSigs = null;
87  private String dataDir;
88 
89  // TODO MAT we need to merge this into common code base for these functions with
90  // CalciteDirect since we are not deprecating this stuff yet
92  String dataDir,
93  String extensionFunctionsAstFile,
95  String udfAstFile) {
96  this.mapdPort = mapdPort;
97  this.dataDir = dataDir;
98 
99  Map<String, ExtensionFunction> udfSigs = null;
100 
101  try {
102  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
103  } catch (IOException ex) {
104  MAPDLOGGER.error(
105  "Could not load extension function signatures: " + ex.getMessage(), ex);
106  }
107  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
108 
109  try {
110  if (!udfAstFile.isEmpty()) {
111  udfSigs = ExtensionFunctionSignatureParser.parseUdfAst(udfAstFile);
112  }
113  } catch (IOException ex) {
114  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage(), ex);
115  }
116  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
117 
118  // Put all the udf functions signatures in extSigs so Calcite has a view of
119  // extension functions and udf functions
120  if (!udfAstFile.isEmpty()) {
121  extSigs.putAll(udfSigs);
122  }
123 
124  calciteParserFactory = new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
125 
126  // GenericObjectPool::setFactory is deprecated
127  this.parserPool = new GenericObjectPool(calciteParserFactory);
128  }
129 
130  @Override
131  public void ping() throws TException {
132  MAPDLOGGER.debug("Ping hit");
133  }
134 
135  @Override
136  public TPlanResult process(String user,
137  String session,
138  String catalog,
139  String queryText,
140  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
141  boolean legacySyntax,
142  boolean isExplain,
143  boolean isViewOptimize,
144  TRestriction restriction) throws InvalidParseRequest, TException {
145  long timer = System.currentTimeMillis();
146  callCount++;
147 
149  try {
150  parser = (MapDParser) parserPool.borrowObject();
151  parser.clearMemo();
152  } catch (Exception ex) {
153  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
154  MAPDLOGGER.error(msg, ex);
155  throw new InvalidParseRequest(-1, msg);
156  }
157  Restriction rest = null;
158  if (restriction != null && !restriction.column.isEmpty()) {
159  rest = new Restriction(restriction.column, restriction.values);
160  }
161  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, rest);
162  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
163  + " sql: " + queryText);
164  parser.setUser(mapDUser);
165  CURRENT_PARSER.set(parser);
166 
167  // need to trim the sql string as it seems it is not trimed prior to here
168  boolean isRAQuery = false;
169 
170  if (queryText.startsWith("execute calcite")) {
171  queryText = queryText.replaceFirst("execute calcite", "");
172  isRAQuery = true;
173  }
174 
175  queryText = queryText.trim();
176  // remove last charcter if it is a ;
177  if (queryText.length() > 0 && queryText.charAt(queryText.length() - 1) == ';') {
178  queryText = queryText.substring(0, queryText.length() - 1);
179  }
180  String jsonResult;
181  SqlIdentifierCapturer capturer;
182  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
183  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
184  try {
185  if (!isRAQuery) {
186  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
187  new ArrayList<>();
188  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
189  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
190  req.input_prev, req.input_start, req.input_next));
191  }
192  Pair<String, SqlIdentifierCapturer> res;
193  SqlNode node;
194  try {
195  MapDParserOptions parserOptions = new MapDParserOptions(
196  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
197  res = parser.process(queryText, parserOptions);
198  jsonResult = res.left;
199  capturer = res.right;
200 
201  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
202  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
203  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
204  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
205 
206  // also resolve all the views in the select part
207  // resolution of the other parts is not
208  // necessary as these cannot be views
209  resolvedAccessedObjects.tables_selected_from =
210  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
211  resolvedAccessedObjects.tables_inserted_into =
212  new ArrayList<>(capturer.inserts);
213  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
214  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
215  } catch (ValidationException ex) {
216  String msg = "Validation: " + ex.getMessage();
217  MAPDLOGGER.error(msg, ex);
218  throw ex;
219  } catch (RelConversionException ex) {
220  String msg = " RelConversion failed: " + ex.getMessage();
221  MAPDLOGGER.error(msg, ex);
222  throw ex;
223  }
224  } else {
225  jsonResult = parser.optimizeRAQuery(queryText);
226  }
227  } catch (SqlParseException ex) {
228  String msg = "Parse failed: " + ex.getMessage();
229  MAPDLOGGER.error(msg, ex);
230  throw new InvalidParseRequest(-2, msg);
231  } catch (CalciteContextException ex) {
232  String msg = "Validate failed: " + ex.getMessage();
233  MAPDLOGGER.error(msg, ex);
234  throw new InvalidParseRequest(-3, msg);
235  } catch (Throwable ex) {
236  String msg = "Exception occurred: " + ex.getMessage();
237  MAPDLOGGER.error(msg, ex);
238  throw new InvalidParseRequest(-4, msg);
239  } finally {
240  CURRENT_PARSER.set(null);
241  try {
242  // put parser object back in pool for others to use
243  parserPool.returnObject(parser);
244  } catch (Exception ex) {
245  String msg = "Could not return parse object: " + ex.getMessage();
246  MAPDLOGGER.error(msg, ex);
247  throw new InvalidParseRequest(-4, msg);
248  }
249  }
250 
251  TPlanResult result = new TPlanResult();
252  result.primary_accessed_objects = primaryAccessedObjects;
253  result.resolved_accessed_objects = resolvedAccessedObjects;
254  result.plan_result = jsonResult;
255  result.execution_time_ms = System.currentTimeMillis() - timer;
256 
257  return result;
258  }
259 
260  @Override
261  public void shutdown() throws TException {
262  // received request to shutdown
263  MAPDLOGGER.debug("Shutdown calcite java server");
264  server.stop();
265  }
266 
267  @Override
269  return this.extSigsJson;
270  }
271 
272  @Override
274  return this.udfSigsJson;
275  }
276 
277  @Override
279  return this.udfRTSigsJson;
280  }
281 
282  void setServer(TServer s) {
283  server = s;
284  }
285 
286  @Override
287  public void updateMetadata(String catalog, String table) throws TException {
288  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
289  long timer = System.currentTimeMillis();
290  callCount++;
292  try {
293  parser = (MapDParser) parserPool.borrowObject();
294  } catch (Exception ex) {
295  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
296  MAPDLOGGER.error(msg, ex);
297  return;
298  }
299  CURRENT_PARSER.set(parser);
300  try {
301  parser.updateMetaData(catalog, table);
302  } finally {
303  CURRENT_PARSER.set(null);
304  try {
305  // put parser object back in pool for others to use
306  MAPDLOGGER.debug("Returning object to pool");
307  parserPool.returnObject(parser);
308  } catch (Exception ex) {
309  String msg = "Could not return parse object: " + ex.getMessage();
310  MAPDLOGGER.error(msg, ex);
311  }
312  }
313  }
314 
315  @Override
316  public List<TCompletionHint> getCompletionHints(String user,
317  String session,
318  String catalog,
319  List<String> visible_tables,
320  String sql,
321  int cursor) throws TException {
322  callCount++;
324  try {
325  parser = (MapDParser) parserPool.borrowObject();
326  } catch (Exception ex) {
327  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
328  MAPDLOGGER.error(msg, ex);
329  throw new TException(msg);
330  }
331  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, null);
332  MAPDLOGGER.debug("getCompletionHints was called User: " + user
333  + " Catalog: " + catalog + " sql: " + sql);
334  parser.setUser(mapDUser);
335  CURRENT_PARSER.set(parser);
336 
337  MapDPlanner.CompletionResult completion_result;
338  try {
339  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
340  } catch (Exception ex) {
341  String msg = "Could not retrieve completion hints: " + ex.getMessage();
342  MAPDLOGGER.error(msg, ex);
343  return new ArrayList<>();
344  } finally {
345  CURRENT_PARSER.set(null);
346  try {
347  // put parser object back in pool for others to use
348  parserPool.returnObject(parser);
349  } catch (Exception ex) {
350  String msg = "Could not return parse object: " + ex.getMessage();
351  MAPDLOGGER.error(msg, ex);
352  throw new InvalidParseRequest(-4, msg);
353  }
354  }
355  List<TCompletionHint> result = new ArrayList<>();
356  for (final SqlMoniker hint : completion_result.hints) {
357  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
358  hint.getFullyQualifiedNames(),
359  completion_result.replaced));
360  }
361  return result;
362  }
363 
364  @Override
365  public void setRuntimeExtensionFunctions(List<TUserDefinedFunction> udfs,
366  List<TUserDefinedTableFunction> udtfs,
367  boolean isruntime) {
368  if (isruntime) {
369  // Clean up previously defined Runtime UDFs
370  if (udfRTSigs != null) {
371  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
372  udfRTSigsJson = "";
373  udfRTSigs.clear();
374  } else {
375  udfRTSigs = new HashMap<String, ExtensionFunction>();
376  }
377 
378  for (TUserDefinedFunction udf : udfs) {
379  udfRTSigs.put(udf.name, toExtensionFunction(udf));
380  }
381 
382  for (TUserDefinedTableFunction udtf : udtfs) {
383  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
384  }
385 
386  // Avoid overwritting compiled and Loadtime UDFs:
387  for (String name : udfRTSigs.keySet()) {
388  if (extSigs.containsKey(name)) {
389  MAPDLOGGER.error("Extension function `" + name
390  + "` exists. Skipping runtime extenension function with the same name.");
391  udfRTSigs.remove(name);
392  }
393  }
394  // udfRTSigsJson will contain only the signatures of UDFs:
395  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
396  // Expose RT UDFs to Calcite server:
397  extSigs.putAll(udfRTSigs);
398  } else {
399  // currently only LoadTime UDTFs can be registered via calcite thrift interface
400  if (udtfSigs == null) {
401  udtfSigs = new HashMap<String, ExtensionFunction>();
402  }
403 
404  for (TUserDefinedTableFunction udtf : udtfs) {
405  udtfSigs.put(udtf.name, toExtensionFunction(udtf));
406  }
407 
408  extSigs.putAll(udtfSigs);
409  }
410 
411  calciteParserFactory.updateOperatorTable();
412  }
413 
414  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
415  List<ExtensionFunction.ExtArgumentType> args =
416  new ArrayList<ExtensionFunction.ExtArgumentType>();
417  for (TExtArgumentType atype : udf.argTypes) {
418  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
419  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
420  args.add(arg_type);
421  }
422  }
423  return new ExtensionFunction(args, toExtArgumentType(udf.retType));
424  }
425 
426  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
427  List<ExtensionFunction.ExtArgumentType> args =
428  new ArrayList<ExtensionFunction.ExtArgumentType>();
429  for (TExtArgumentType atype : udtf.sqlArgTypes) {
430  args.add(toExtArgumentType(atype));
431  }
432  List<ExtensionFunction.ExtArgumentType> outs =
433  new ArrayList<ExtensionFunction.ExtArgumentType>();
434  for (TExtArgumentType otype : udtf.outputArgTypes) {
435  outs.add(toExtArgumentType(otype));
436  }
437  return new ExtensionFunction(args, outs);
438  }
439 
440  private static ExtensionFunction.ExtArgumentType toExtArgumentType(
441  TExtArgumentType type) {
442  switch (type) {
443  case Int8:
444  return ExtensionFunction.ExtArgumentType.Int8;
445  case Int16:
446  return ExtensionFunction.ExtArgumentType.Int16;
447  case Int32:
448  return ExtensionFunction.ExtArgumentType.Int32;
449  case Int64:
450  return ExtensionFunction.ExtArgumentType.Int64;
451  case Float:
452  return ExtensionFunction.ExtArgumentType.Float;
453  case Double:
455  case Void:
456  return ExtensionFunction.ExtArgumentType.Void;
457  case PInt8:
458  return ExtensionFunction.ExtArgumentType.PInt8;
459  case PInt16:
460  return ExtensionFunction.ExtArgumentType.PInt16;
461  case PInt32:
462  return ExtensionFunction.ExtArgumentType.PInt32;
463  case PInt64:
464  return ExtensionFunction.ExtArgumentType.PInt64;
465  case PFloat:
466  return ExtensionFunction.ExtArgumentType.PFloat;
467  case PDouble:
468  return ExtensionFunction.ExtArgumentType.PDouble;
469  case PBool:
470  return ExtensionFunction.ExtArgumentType.PBool;
471  case Bool:
472  return ExtensionFunction.ExtArgumentType.Bool;
473  case ArrayInt8:
474  return ExtensionFunction.ExtArgumentType.ArrayInt8;
475  case ArrayInt16:
476  return ExtensionFunction.ExtArgumentType.ArrayInt16;
477  case ArrayInt32:
478  return ExtensionFunction.ExtArgumentType.ArrayInt32;
479  case ArrayInt64:
480  return ExtensionFunction.ExtArgumentType.ArrayInt64;
481  case ArrayFloat:
482  return ExtensionFunction.ExtArgumentType.ArrayFloat;
483  case ArrayDouble:
484  return ExtensionFunction.ExtArgumentType.ArrayDouble;
485  case ArrayBool:
486  return ExtensionFunction.ExtArgumentType.ArrayBool;
487  case ColumnInt8:
488  return ExtensionFunction.ExtArgumentType.ColumnInt8;
489  case ColumnInt16:
490  return ExtensionFunction.ExtArgumentType.ColumnInt16;
491  case ColumnInt32:
492  return ExtensionFunction.ExtArgumentType.ColumnInt32;
493  case ColumnInt64:
494  return ExtensionFunction.ExtArgumentType.ColumnInt64;
495  case ColumnFloat:
496  return ExtensionFunction.ExtArgumentType.ColumnFloat;
497  case ColumnDouble:
498  return ExtensionFunction.ExtArgumentType.ColumnDouble;
499  case ColumnBool:
500  return ExtensionFunction.ExtArgumentType.ColumnBool;
501  case GeoPoint:
502  return ExtensionFunction.ExtArgumentType.GeoPoint;
503  case GeoLineString:
504  return ExtensionFunction.ExtArgumentType.GeoLineString;
505  case Cursor:
506  return ExtensionFunction.ExtArgumentType.Cursor;
507  case GeoPolygon:
508  return ExtensionFunction.ExtArgumentType.GeoPolygon;
509  case GeoMultiPolygon:
510  return ExtensionFunction.ExtArgumentType.GeoMultiPolygon;
511  case TextEncodingNone:
512  return ExtensionFunction.ExtArgumentType.TextEncodingNone;
513  case TextEncodingDict8:
514  return ExtensionFunction.ExtArgumentType.TextEncodingDict8;
515  case TextEncodingDict16:
516  return ExtensionFunction.ExtArgumentType.TextEncodingDict16;
517  case TextEncodingDict32:
518  return ExtensionFunction.ExtArgumentType.TextEncodingDict32;
519  default:
520  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
521  return null;
522  }
523  }
524 
525  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
526  switch (type) {
527  case COLUMN:
528  return TCompletionHintType.COLUMN;
529  case TABLE:
530  return TCompletionHintType.TABLE;
531  case VIEW:
532  return TCompletionHintType.VIEW;
533  case SCHEMA:
534  return TCompletionHintType.SCHEMA;
535  case CATALOG:
536  return TCompletionHintType.CATALOG;
537  case REPOSITORY:
538  return TCompletionHintType.REPOSITORY;
539  case FUNCTION:
540  return TCompletionHintType.FUNCTION;
541  case KEYWORD:
542  return TCompletionHintType.KEYWORD;
543  default:
544  return null;
545  }
546  }
547 }
TPlanResult process(String user, String session, String catalog, String queryText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize, TRestriction restriction)
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs, boolean isruntime)
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
Map< String, ExtensionFunction > udfRTSigs
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
string name
Definition: setup.py:44
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)