OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
18 import static com.mapd.calcite.parser.MapDParser.CURRENT_PARSER;
19 
24 import com.omnisci.thrift.calciteserver.CalciteServer;
25 import com.omnisci.thrift.calciteserver.InvalidParseRequest;
26 import com.omnisci.thrift.calciteserver.TAccessedQueryObjects;
27 import com.omnisci.thrift.calciteserver.TCompletionHint;
28 import com.omnisci.thrift.calciteserver.TCompletionHintType;
29 import com.omnisci.thrift.calciteserver.TExtArgumentType;
30 import com.omnisci.thrift.calciteserver.TFilterPushDownInfo;
31 import com.omnisci.thrift.calciteserver.TPlanResult;
32 import com.omnisci.thrift.calciteserver.TRestriction;
33 import com.omnisci.thrift.calciteserver.TUserDefinedFunction;
34 import com.omnisci.thrift.calciteserver.TUserDefinedTableFunction;
35 
39 import org.apache.calcite.runtime.CalciteContextException;
40 import org.apache.calcite.sql.SqlNode;
41 import org.apache.calcite.sql.parser.SqlParseException;
42 import org.apache.calcite.sql.validate.SqlMoniker;
43 import org.apache.calcite.sql.validate.SqlMonikerType;
44 import org.apache.calcite.tools.RelConversionException;
45 import org.apache.calcite.tools.ValidationException;
46 import org.apache.calcite.util.Pair;
47 import org.apache.commons.pool.PoolableObjectFactory;
48 import org.apache.commons.pool.impl.GenericObjectPool;
49 import org.apache.thrift.TException;
50 import org.apache.thrift.server.TServer;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 
54 import java.io.IOException;
55 import java.util.ArrayList;
56 import java.util.HashMap;
57 import java.util.List;
58 import java.util.Map;
59 
64 public class CalciteServerHandler implements CalciteServer.Iface {
65  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
66  private TServer server;
67 
68  private final int mapdPort;
69 
70  private volatile long callCount;
71 
72  private final GenericObjectPool parserPool;
73 
75 
76  private final String extSigsJson;
77 
78  private final String udfSigsJson;
79 
80  private String udfRTSigsJson = "";
81  Map<String, ExtensionFunction> udfRTSigs = null;
82 
83  Map<String, ExtensionFunction> udtfSigs = null;
84 
86  private Map<String, ExtensionFunction> extSigs = null;
87  private String dataDir;
88 
89  // TODO MAT we need to merge this into common code base for these functions with
90  // CalciteDirect since we are not deprecating this stuff yet
92  String dataDir,
93  String extensionFunctionsAstFile,
95  String udfAstFile) {
96  this.mapdPort = mapdPort;
97  this.dataDir = dataDir;
98 
99  Map<String, ExtensionFunction> udfSigs = null;
100 
101  try {
102  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
103  } catch (IOException ex) {
104  MAPDLOGGER.error(
105  "Could not load extension function signatures: " + ex.getMessage(), ex);
106  }
107  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
108 
109  try {
110  if (!udfAstFile.isEmpty()) {
111  udfSigs = ExtensionFunctionSignatureParser.parseUdfAst(udfAstFile);
112  }
113  } catch (IOException ex) {
114  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage(), ex);
115  }
116  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
117 
118  // Put all the udf functions signatures in extSigs so Calcite has a view of
119  // extension functions and udf functions
120  if (!udfAstFile.isEmpty()) {
121  extSigs.putAll(udfSigs);
122  }
123 
124  calciteParserFactory = new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
125 
126  // GenericObjectPool::setFactory is deprecated
127  this.parserPool = new GenericObjectPool(calciteParserFactory);
128  }
129 
130  @Override
131  public void ping() throws TException {
132  MAPDLOGGER.debug("Ping hit");
133  }
134 
135  @Override
136  public TPlanResult process(String user,
137  String session,
138  String catalog,
139  String queryText,
140  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
141  boolean legacySyntax,
142  boolean isExplain,
143  boolean isViewOptimize,
144  TRestriction restriction) throws InvalidParseRequest, TException {
145  long timer = System.currentTimeMillis();
146  callCount++;
147 
149  try {
150  parser = (MapDParser) parserPool.borrowObject();
151  parser.clearMemo();
152  } catch (Exception ex) {
153  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
154  MAPDLOGGER.error(msg, ex);
155  throw new InvalidParseRequest(-1, msg);
156  }
157  Restriction rest = null;
158  if (restriction != null && !restriction.column.isEmpty()) {
159  rest = new Restriction(restriction.column, restriction.values);
160  }
161  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, rest);
162  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
163  + " sql: " + queryText);
164  parser.setUser(mapDUser);
165  CURRENT_PARSER.set(parser);
166 
167  // need to trim the sql string as it seems it is not trimed prior to here
168  boolean isRAQuery = false;
169 
170  if (queryText.startsWith("execute calcite")) {
171  queryText = queryText.replaceFirst("execute calcite", "");
172  isRAQuery = true;
173  }
174 
175  queryText = queryText.trim();
176  // remove last charcter if it is a ;
177  if (queryText.length() > 0 && queryText.charAt(queryText.length() - 1) == ';') {
178  queryText = queryText.substring(0, queryText.length() - 1);
179  }
180  String jsonResult;
181  SqlIdentifierCapturer capturer;
182  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
183  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
184  try {
185  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
186  new ArrayList<>();
187  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
188  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
189  req.input_prev, req.input_start, req.input_next));
190  }
191  MapDParserOptions parserOptions = new MapDParserOptions(
192  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
193 
194  if (!isRAQuery) {
195  Pair<String, SqlIdentifierCapturer> res;
196  SqlNode node;
197 
198  res = parser.process(queryText, parserOptions);
199  jsonResult = res.left;
200  capturer = res.right;
201 
202  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
203  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
204  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
205  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
206 
207  // also resolve all the views in the select part
208  // resolution of the other parts is not
209  // necessary as these cannot be views
210  resolvedAccessedObjects.tables_selected_from =
211  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
212  resolvedAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
213  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
214  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
215 
216  } else {
217  jsonResult = parser.optimizeRAQuery(queryText, parserOptions);
218  }
219  } catch (SqlParseException ex) {
220  String msg = "SQL Error: " + ex.getMessage();
221  MAPDLOGGER.error(msg);
222  throw new InvalidParseRequest(-2, msg);
223  } catch (org.apache.calcite.tools.ValidationException ex) {
224  String msg = "SQL Error: " + ex.getMessage();
225  if (ex.getCause() != null
226  && (ex.getCause().getClass() == CalciteContextException.class)) {
227  msg = "SQL Error: " + ex.getCause().getMessage();
228  }
229  MAPDLOGGER.error(msg);
230  throw new InvalidParseRequest(-3, msg);
231  } catch (CalciteContextException ex) {
232  String msg = ex.getMessage();
233  MAPDLOGGER.error(msg);
234  throw new InvalidParseRequest(-6, msg);
235  } catch (RelConversionException ex) {
236  String msg = "Failed to generate relational algebra for query " + ex.getMessage();
237  MAPDLOGGER.error(msg, ex);
238  throw new InvalidParseRequest(-5, msg);
239  } catch (Throwable ex) {
240  MAPDLOGGER.error(ex.getClass().toString());
241  String msg = "Exception occurred: " + ex.getMessage();
242  MAPDLOGGER.error(msg, ex);
243  throw new InvalidParseRequest(-4, msg);
244  } finally {
245  CURRENT_PARSER.set(null);
246  try {
247  // put parser object back in pool for others to use
248  parserPool.returnObject(parser);
249  } catch (Exception ex) {
250  String msg = "Could not return parse object: " + ex.getMessage();
251  MAPDLOGGER.error(msg, ex);
252  throw new InvalidParseRequest(-7, msg);
253  }
254  }
255 
256  TPlanResult result = new TPlanResult();
257  result.primary_accessed_objects = primaryAccessedObjects;
258  result.resolved_accessed_objects = resolvedAccessedObjects;
259  result.plan_result = jsonResult;
260  result.execution_time_ms = System.currentTimeMillis() - timer;
261 
262  return result;
263  }
264 
265  @Override
266  public void shutdown() throws TException {
267  // received request to shutdown
268  MAPDLOGGER.debug("Shutdown calcite java server");
269  server.stop();
270  }
271 
272  @Override
274  return this.extSigsJson;
275  }
276 
277  @Override
279  return this.udfSigsJson;
280  }
281 
282  @Override
284  return this.udfRTSigsJson;
285  }
286 
287  void setServer(TServer s) {
288  server = s;
289  }
290 
291  @Override
292  public void updateMetadata(String catalog, String table) throws TException {
293  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
294  long timer = System.currentTimeMillis();
295  callCount++;
297  try {
298  parser = (MapDParser) parserPool.borrowObject();
299  } catch (Exception ex) {
300  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
301  MAPDLOGGER.error(msg, ex);
302  return;
303  }
304  CURRENT_PARSER.set(parser);
305  try {
306  parser.updateMetaData(catalog, table);
307  } finally {
308  CURRENT_PARSER.set(null);
309  try {
310  // put parser object back in pool for others to use
311  MAPDLOGGER.debug("Returning object to pool");
312  parserPool.returnObject(parser);
313  } catch (Exception ex) {
314  String msg = "Could not return parse object: " + ex.getMessage();
315  MAPDLOGGER.error(msg, ex);
316  }
317  }
318  }
319 
320  @Override
321  public List<TCompletionHint> getCompletionHints(String user,
322  String session,
323  String catalog,
324  List<String> visible_tables,
325  String sql,
326  int cursor) throws TException {
327  callCount++;
329  try {
330  parser = (MapDParser) parserPool.borrowObject();
331  } catch (Exception ex) {
332  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
333  MAPDLOGGER.error(msg, ex);
334  throw new TException(msg);
335  }
336  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, null);
337  MAPDLOGGER.debug("getCompletionHints was called User: " + user
338  + " Catalog: " + catalog + " sql: " + sql);
339  parser.setUser(mapDUser);
340  CURRENT_PARSER.set(parser);
341 
342  MapDPlanner.CompletionResult completion_result;
343  try {
344  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
345  } catch (Exception ex) {
346  String msg = "Could not retrieve completion hints: " + ex.getMessage();
347  MAPDLOGGER.error(msg, ex);
348  return new ArrayList<>();
349  } finally {
350  CURRENT_PARSER.set(null);
351  try {
352  // put parser object back in pool for others to use
353  parserPool.returnObject(parser);
354  } catch (Exception ex) {
355  String msg = "Could not return parse object: " + ex.getMessage();
356  MAPDLOGGER.error(msg, ex);
357  throw new InvalidParseRequest(-4, msg);
358  }
359  }
360  List<TCompletionHint> result = new ArrayList<>();
361  for (final SqlMoniker hint : completion_result.hints) {
362  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
363  hint.getFullyQualifiedNames(),
364  completion_result.replaced));
365  }
366  return result;
367  }
368 
369  @Override
370  public void setRuntimeExtensionFunctions(List<TUserDefinedFunction> udfs,
371  List<TUserDefinedTableFunction> udtfs,
372  boolean isruntime) {
373  if (isruntime) {
374  // Clean up previously defined Runtime UDFs
375  if (udfRTSigs != null) {
376  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
377  udfRTSigsJson = "";
378  udfRTSigs.clear();
379  } else {
380  udfRTSigs = new HashMap<String, ExtensionFunction>();
381  }
382 
383  for (TUserDefinedFunction udf : udfs) {
384  udfRTSigs.put(udf.name, toExtensionFunction(udf));
385  }
386 
387  for (TUserDefinedTableFunction udtf : udtfs) {
388  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
389  }
390 
391  // Avoid overwritting compiled and Loadtime UDFs:
392  for (String name : udfRTSigs.keySet()) {
393  if (extSigs.containsKey(name)) {
394  MAPDLOGGER.error("Extension function `" + name
395  + "` exists. Skipping runtime extenension function with the same name.");
396  udfRTSigs.remove(name);
397  }
398  }
399  // udfRTSigsJson will contain only the signatures of UDFs:
400  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
401  // Expose RT UDFs to Calcite server:
402  extSigs.putAll(udfRTSigs);
403  } else {
404  // currently only LoadTime UDTFs can be registered via calcite thrift interface
405  if (udtfSigs == null) {
406  udtfSigs = new HashMap<String, ExtensionFunction>();
407  }
408 
409  for (TUserDefinedTableFunction udtf : udtfs) {
410  udtfSigs.put(udtf.name, toExtensionFunction(udtf));
411  }
412 
413  extSigs.putAll(udtfSigs);
414  }
415 
416  calciteParserFactory.updateOperatorTable();
417  }
418 
419  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
420  List<ExtensionFunction.ExtArgumentType> args =
421  new ArrayList<ExtensionFunction.ExtArgumentType>();
422  for (TExtArgumentType atype : udf.argTypes) {
423  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
424  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
425  args.add(arg_type);
426  }
427  }
428  return new ExtensionFunction(args, toExtArgumentType(udf.retType));
429  }
430 
431  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
432  List<ExtensionFunction.ExtArgumentType> args =
433  new ArrayList<ExtensionFunction.ExtArgumentType>();
434  for (TExtArgumentType atype : udtf.sqlArgTypes) {
435  args.add(toExtArgumentType(atype));
436  }
437  List<ExtensionFunction.ExtArgumentType> outs =
438  new ArrayList<ExtensionFunction.ExtArgumentType>();
439  for (TExtArgumentType otype : udtf.outputArgTypes) {
440  outs.add(toExtArgumentType(otype));
441  }
442  return new ExtensionFunction(args, outs);
443  }
444 
445  private static ExtensionFunction.ExtArgumentType toExtArgumentType(
446  TExtArgumentType type) {
447  switch (type) {
448  case Int8:
449  return ExtensionFunction.ExtArgumentType.Int8;
450  case Int16:
451  return ExtensionFunction.ExtArgumentType.Int16;
452  case Int32:
453  return ExtensionFunction.ExtArgumentType.Int32;
454  case Int64:
455  return ExtensionFunction.ExtArgumentType.Int64;
456  case Float:
457  return ExtensionFunction.ExtArgumentType.Float;
458  case Double:
460  case Void:
461  return ExtensionFunction.ExtArgumentType.Void;
462  case PInt8:
463  return ExtensionFunction.ExtArgumentType.PInt8;
464  case PInt16:
465  return ExtensionFunction.ExtArgumentType.PInt16;
466  case PInt32:
467  return ExtensionFunction.ExtArgumentType.PInt32;
468  case PInt64:
469  return ExtensionFunction.ExtArgumentType.PInt64;
470  case PFloat:
471  return ExtensionFunction.ExtArgumentType.PFloat;
472  case PDouble:
473  return ExtensionFunction.ExtArgumentType.PDouble;
474  case PBool:
475  return ExtensionFunction.ExtArgumentType.PBool;
476  case Bool:
477  return ExtensionFunction.ExtArgumentType.Bool;
478  case ArrayInt8:
479  return ExtensionFunction.ExtArgumentType.ArrayInt8;
480  case ArrayInt16:
481  return ExtensionFunction.ExtArgumentType.ArrayInt16;
482  case ArrayInt32:
483  return ExtensionFunction.ExtArgumentType.ArrayInt32;
484  case ArrayInt64:
485  return ExtensionFunction.ExtArgumentType.ArrayInt64;
486  case ArrayFloat:
487  return ExtensionFunction.ExtArgumentType.ArrayFloat;
488  case ArrayDouble:
489  return ExtensionFunction.ExtArgumentType.ArrayDouble;
490  case ArrayBool:
491  return ExtensionFunction.ExtArgumentType.ArrayBool;
492  case ColumnInt8:
493  return ExtensionFunction.ExtArgumentType.ColumnInt8;
494  case ColumnInt16:
495  return ExtensionFunction.ExtArgumentType.ColumnInt16;
496  case ColumnInt32:
497  return ExtensionFunction.ExtArgumentType.ColumnInt32;
498  case ColumnInt64:
499  return ExtensionFunction.ExtArgumentType.ColumnInt64;
500  case ColumnFloat:
501  return ExtensionFunction.ExtArgumentType.ColumnFloat;
502  case ColumnDouble:
503  return ExtensionFunction.ExtArgumentType.ColumnDouble;
504  case ColumnBool:
505  return ExtensionFunction.ExtArgumentType.ColumnBool;
506  case GeoPoint:
507  return ExtensionFunction.ExtArgumentType.GeoPoint;
508  case GeoLineString:
509  return ExtensionFunction.ExtArgumentType.GeoLineString;
510  case Cursor:
511  return ExtensionFunction.ExtArgumentType.Cursor;
512  case GeoPolygon:
513  return ExtensionFunction.ExtArgumentType.GeoPolygon;
514  case GeoMultiPolygon:
515  return ExtensionFunction.ExtArgumentType.GeoMultiPolygon;
516  case TextEncodingNone:
517  return ExtensionFunction.ExtArgumentType.TextEncodingNone;
518  case TextEncodingDict8:
519  return ExtensionFunction.ExtArgumentType.TextEncodingDict8;
520  case TextEncodingDict16:
521  return ExtensionFunction.ExtArgumentType.TextEncodingDict16;
522  case TextEncodingDict32:
523  return ExtensionFunction.ExtArgumentType.TextEncodingDict32;
524  case ColumnListInt8:
525  return ExtensionFunction.ExtArgumentType.ColumnListInt8;
526  case ColumnListInt16:
527  return ExtensionFunction.ExtArgumentType.ColumnListInt16;
528  case ColumnListInt32:
529  return ExtensionFunction.ExtArgumentType.ColumnListInt32;
530  case ColumnListInt64:
531  return ExtensionFunction.ExtArgumentType.ColumnListInt64;
532  case ColumnListFloat:
533  return ExtensionFunction.ExtArgumentType.ColumnListFloat;
534  case ColumnListDouble:
535  return ExtensionFunction.ExtArgumentType.ColumnListDouble;
536  case ColumnListBool:
537  return ExtensionFunction.ExtArgumentType.ColumnListBool;
538  default:
539  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
540  return null;
541  }
542  }
543 
544  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
545  switch (type) {
546  case COLUMN:
548  case TABLE:
549  return TCompletionHintType.TABLE;
550  case VIEW:
552  case SCHEMA:
554  case CATALOG:
555  return TCompletionHintType.CATALOG;
556  case REPOSITORY:
557  return TCompletionHintType.REPOSITORY;
558  case FUNCTION:
559  return TCompletionHintType.FUNCTION;
560  case KEYWORD:
561  return TCompletionHintType.KEYWORD;
562  default:
563  return null;
564  }
565  }
566 }
std::string toString(const ExtArgumentType &sig_type)
string name
Definition: setup.in.py:72
TPlanResult process(String user, String session, String catalog, String queryText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize, TRestriction restriction)
#define VIEW
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs, boolean isruntime)
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
#define COLUMN
Map< String, ExtensionFunction > udfRTSigs
#define SCHEMA
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
#define TABLE
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)