OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
18 import static com.mapd.calcite.parser.MapDParser.CURRENT_PARSER;
19 
24 import com.omnisci.thrift.calciteserver.CalciteServer;
25 import com.omnisci.thrift.calciteserver.InvalidParseRequest;
26 import com.omnisci.thrift.calciteserver.TAccessedQueryObjects;
27 import com.omnisci.thrift.calciteserver.TCompletionHint;
28 import com.omnisci.thrift.calciteserver.TCompletionHintType;
29 import com.omnisci.thrift.calciteserver.TExtArgumentType;
30 import com.omnisci.thrift.calciteserver.TFilterPushDownInfo;
31 import com.omnisci.thrift.calciteserver.TPlanResult;
32 import com.omnisci.thrift.calciteserver.TRestriction;
33 import com.omnisci.thrift.calciteserver.TUserDefinedFunction;
34 import com.omnisci.thrift.calciteserver.TUserDefinedTableFunction;
35 
39 import org.apache.calcite.runtime.CalciteContextException;
40 import org.apache.calcite.sql.SqlNode;
41 import org.apache.calcite.sql.parser.SqlParseException;
42 import org.apache.calcite.sql.validate.SqlMoniker;
43 import org.apache.calcite.sql.validate.SqlMonikerType;
44 import org.apache.calcite.tools.RelConversionException;
45 import org.apache.calcite.tools.ValidationException;
46 import org.apache.calcite.util.Pair;
47 import org.apache.commons.pool.PoolableObjectFactory;
48 import org.apache.commons.pool.impl.GenericObjectPool;
49 import org.apache.thrift.TException;
50 import org.apache.thrift.server.TServer;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 
54 import java.io.IOException;
55 import java.util.ArrayList;
56 import java.util.HashMap;
57 import java.util.List;
58 import java.util.Map;
59 
64 public class CalciteServerHandler implements CalciteServer.Iface {
65  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
66  private TServer server;
67 
68  private final int mapdPort;
69 
70  private volatile long callCount;
71 
72  private final GenericObjectPool parserPool;
73 
75 
76  private final String extSigsJson;
77 
78  private final String udfSigsJson;
79 
80  private String udfRTSigsJson = "";
81  Map<String, ExtensionFunction> udfRTSigs = null;
82 
83  Map<String, ExtensionFunction> udtfSigs = null;
84 
86  private Map<String, ExtensionFunction> extSigs = null;
87  private String dataDir;
88 
89  // TODO MAT we need to merge this into common code base for these functions with
90  // CalciteDirect since we are not deprecating this stuff yet
92  String dataDir,
93  String extensionFunctionsAstFile,
95  String udfAstFile) {
96  this.mapdPort = mapdPort;
97  this.dataDir = dataDir;
98 
99  Map<String, ExtensionFunction> udfSigs = null;
100 
101  try {
102  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
103  } catch (IOException ex) {
104  MAPDLOGGER.error(
105  "Could not load extension function signatures: " + ex.getMessage(), ex);
106  }
107  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
108 
109  try {
110  if (!udfAstFile.isEmpty()) {
111  udfSigs = ExtensionFunctionSignatureParser.parseUdfAst(udfAstFile);
112  }
113  } catch (IOException ex) {
114  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage(), ex);
115  }
116  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
117 
118  // Put all the udf functions signatures in extSigs so Calcite has a view of
119  // extension functions and udf functions
120  if (!udfAstFile.isEmpty()) {
121  extSigs.putAll(udfSigs);
122  }
123 
124  calciteParserFactory = new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
125 
126  // GenericObjectPool::setFactory is deprecated
127  this.parserPool = new GenericObjectPool(calciteParserFactory);
128  }
129 
130  @Override
131  public void ping() throws TException {
132  MAPDLOGGER.debug("Ping hit");
133  }
134 
135  @Override
136  public TPlanResult process(String user,
137  String session,
138  String catalog,
139  String queryText,
140  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
141  boolean legacySyntax,
142  boolean isExplain,
143  boolean isViewOptimize,
144  TRestriction restriction) throws InvalidParseRequest, TException {
145  long timer = System.currentTimeMillis();
146  callCount++;
147 
149  try {
150  parser = (MapDParser) parserPool.borrowObject();
151  parser.clearMemo();
152  } catch (Exception ex) {
153  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
154  MAPDLOGGER.error(msg, ex);
155  throw new InvalidParseRequest(-1, msg);
156  }
157  Restriction rest = null;
158  if (restriction != null && !restriction.column.isEmpty()) {
159  rest = new Restriction(restriction.column, restriction.values);
160  }
161  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, rest);
162  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
163  + " sql: " + queryText);
164  parser.setUser(mapDUser);
165  CURRENT_PARSER.set(parser);
166 
167  // need to trim the sql string as it seems it is not trimed prior to here
168  boolean isRAQuery = false;
169 
170  if (queryText.startsWith("execute calcite")) {
171  queryText = queryText.replaceFirst("execute calcite", "");
172  isRAQuery = true;
173  }
174 
175  queryText = queryText.trim();
176  // remove last charcter if it is a ;
177  if (queryText.length() > 0 && queryText.charAt(queryText.length() - 1) == ';') {
178  queryText = queryText.substring(0, queryText.length() - 1);
179  }
180  String jsonResult;
181  SqlIdentifierCapturer capturer;
182  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
183  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
184  try {
185  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
186  new ArrayList<>();
187  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
188  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
189  req.input_prev, req.input_start, req.input_next));
190  }
191  MapDParserOptions parserOptions = new MapDParserOptions(
192  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
193 
194  if (!isRAQuery) {
195  Pair<String, SqlIdentifierCapturer> res;
196  SqlNode node;
197 
198  res = parser.process(queryText, parserOptions);
199  jsonResult = res.left;
200  capturer = res.right;
201 
202  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
203  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
204  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
205  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
206 
207  // also resolve all the views in the select part
208  // resolution of the other parts is not
209  // necessary as these cannot be views
210  resolvedAccessedObjects.tables_selected_from =
211  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
212  resolvedAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
213  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
214  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
215 
216  } else {
217  jsonResult = parser.optimizeRAQuery(queryText, parserOptions);
218  }
219  } catch (SqlParseException ex) {
220  String msg = "SQL Error: " + ex.getMessage();
221  MAPDLOGGER.error(msg);
222  throw new InvalidParseRequest(-2, msg);
223  } catch (org.apache.calcite.tools.ValidationException ex) {
224  String msg = "SQL Error: " + ex.getMessage();
225  if (ex.getCause() != null
226  && (ex.getCause().getClass() == CalciteContextException.class)) {
227  msg = "SQL Error: " + ex.getCause().getMessage();
228  }
229  MAPDLOGGER.error(msg);
230  throw new InvalidParseRequest(-3, msg);
231  } catch (CalciteContextException ex) {
232  String msg = ex.getMessage();
233  MAPDLOGGER.error(msg);
234  throw new InvalidParseRequest(-6, msg);
235  } catch (RelConversionException ex) {
236  String msg = "Failed to generate relational algebra for query " + ex.getMessage();
237  MAPDLOGGER.error(msg, ex);
238  throw new InvalidParseRequest(-5, msg);
239  } catch (Throwable ex) {
240  MAPDLOGGER.error(ex.getClass().toString());
241  String msg = ex.getMessage();
242  MAPDLOGGER.error(msg, ex);
243  throw new InvalidParseRequest(-4, msg);
244  } finally {
245  CURRENT_PARSER.set(null);
246  try {
247  // put parser object back in pool for others to use
248  parserPool.returnObject(parser);
249  } catch (Exception ex) {
250  String msg = "Could not return parse object: " + ex.getMessage();
251  MAPDLOGGER.error(msg, ex);
252  throw new InvalidParseRequest(-7, msg);
253  }
254  }
255 
256  TPlanResult result = new TPlanResult();
257  result.primary_accessed_objects = primaryAccessedObjects;
258  result.resolved_accessed_objects = resolvedAccessedObjects;
259  result.plan_result = jsonResult;
260  result.execution_time_ms = System.currentTimeMillis() - timer;
261 
262  return result;
263  }
264 
265  @Override
266  public void shutdown() throws TException {
267  // received request to shutdown
268  MAPDLOGGER.debug("Shutdown calcite java server");
269  server.stop();
270  }
271 
272  @Override
274  return this.extSigsJson;
275  }
276 
277  @Override
279  return this.udfSigsJson;
280  }
281 
282  @Override
284  return this.udfRTSigsJson;
285  }
286 
287  void setServer(TServer s) {
288  server = s;
289  }
290 
291  @Override
292  public void updateMetadata(String catalog, String table) throws TException {
293  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
294  long timer = System.currentTimeMillis();
295  callCount++;
297  try {
298  parser = (MapDParser) parserPool.borrowObject();
299  } catch (Exception ex) {
300  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
301  MAPDLOGGER.error(msg, ex);
302  return;
303  }
304  CURRENT_PARSER.set(parser);
305  try {
306  parser.updateMetaData(catalog, table);
307  } finally {
308  CURRENT_PARSER.set(null);
309  try {
310  // put parser object back in pool for others to use
311  MAPDLOGGER.debug("Returning object to pool");
312  parserPool.returnObject(parser);
313  } catch (Exception ex) {
314  String msg = "Could not return parse object: " + ex.getMessage();
315  MAPDLOGGER.error(msg, ex);
316  }
317  }
318  }
319 
320  @Override
321  public List<TCompletionHint> getCompletionHints(String user,
322  String session,
323  String catalog,
324  List<String> visible_tables,
325  String sql,
326  int cursor) throws TException {
327  callCount++;
329  try {
330  parser = (MapDParser) parserPool.borrowObject();
331  } catch (Exception ex) {
332  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
333  MAPDLOGGER.error(msg, ex);
334  throw new TException(msg);
335  }
336  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort, null);
337  MAPDLOGGER.debug("getCompletionHints was called User: " + user
338  + " Catalog: " + catalog + " sql: " + sql);
339  parser.setUser(mapDUser);
340  CURRENT_PARSER.set(parser);
341 
342  MapDPlanner.CompletionResult completion_result;
343  try {
344  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
345  } catch (Exception ex) {
346  String msg = "Could not retrieve completion hints: " + ex.getMessage();
347  MAPDLOGGER.error(msg, ex);
348  return new ArrayList<>();
349  } finally {
350  CURRENT_PARSER.set(null);
351  try {
352  // put parser object back in pool for others to use
353  parserPool.returnObject(parser);
354  } catch (Exception ex) {
355  String msg = "Could not return parse object: " + ex.getMessage();
356  MAPDLOGGER.error(msg, ex);
357  throw new InvalidParseRequest(-4, msg);
358  }
359  }
360  List<TCompletionHint> result = new ArrayList<>();
361  for (final SqlMoniker hint : completion_result.hints) {
362  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
363  hint.getFullyQualifiedNames(),
364  completion_result.replaced));
365  }
366  return result;
367  }
368 
369  @Override
370  public void setRuntimeExtensionFunctions(List<TUserDefinedFunction> udfs,
371  List<TUserDefinedTableFunction> udtfs,
372  boolean isruntime) {
373  if (isruntime) {
374  // Clean up previously defined Runtime UDFs
375  if (udfRTSigs != null) {
376  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
377  udfRTSigsJson = "";
378  udfRTSigs.clear();
379  } else {
380  udfRTSigs = new HashMap<String, ExtensionFunction>();
381  }
382 
383  for (TUserDefinedFunction udf : udfs) {
384  udfRTSigs.put(udf.name, toExtensionFunction(udf));
385  }
386 
387  for (TUserDefinedTableFunction udtf : udtfs) {
388  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
389  }
390 
391  // Avoid overwritting compiled and Loadtime UDFs:
392  for (String name : udfRTSigs.keySet()) {
393  if (extSigs.containsKey(name)) {
394  MAPDLOGGER.error("Extension function `" + name
395  + "` exists. Skipping runtime extenension function with the same name.");
396  udfRTSigs.remove(name);
397  }
398  }
399  // udfRTSigsJson will contain only the signatures of UDFs:
400  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
401  // Expose RT UDFs to Calcite server:
402  extSigs.putAll(udfRTSigs);
403  } else {
404  // currently only LoadTime UDTFs can be registered via calcite thrift interface
405  if (udtfSigs == null) {
406  udtfSigs = new HashMap<String, ExtensionFunction>();
407  }
408 
409  for (TUserDefinedTableFunction udtf : udtfs) {
410  udtfSigs.put(udtf.name, toExtensionFunction(udtf));
411  }
412 
413  extSigs.putAll(udtfSigs);
414  }
415 
416  calciteParserFactory.updateOperatorTable();
417  }
418 
419  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
420  List<ExtensionFunction.ExtArgumentType> args =
421  new ArrayList<ExtensionFunction.ExtArgumentType>();
422  for (TExtArgumentType atype : udf.argTypes) {
423  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
424  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
425  args.add(arg_type);
426  }
427  }
428  return new ExtensionFunction(args, toExtArgumentType(udf.retType));
429  }
430 
431  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
432  int index = 0;
433  int out_index = 0;
434  List<String> names = new ArrayList<String>();
435  List<ExtensionFunction.ExtArgumentType> args =
436  new ArrayList<ExtensionFunction.ExtArgumentType>();
437  for (TExtArgumentType atype : udtf.sqlArgTypes) {
438  args.add(toExtArgumentType(atype));
439  Map<String, String> annot = udtf.annotations.get(index);
440  String name = annot.getOrDefault("name", "inp" + index);
441  if (atype == TExtArgumentType.Cursor && annot.containsKey("fields")) {
442  name = name + annot.get("fields");
443  }
444  names.add(name);
445  index++;
446  }
447  List<ExtensionFunction.ExtArgumentType> outs =
448  new ArrayList<ExtensionFunction.ExtArgumentType>();
449  for (TExtArgumentType otype : udtf.outputArgTypes) {
450  outs.add(toExtArgumentType(otype));
451  Map<String, String> annot = udtf.annotations.get(index);
452  names.add(annot.getOrDefault("name", "out" + out_index));
453  index++;
454  out_index++;
455  }
456  return new ExtensionFunction(
457  args, outs, names, udtf.annotations.get(udtf.annotations.size() - 1));
458  }
459 
460  private static ExtensionFunction.ExtArgumentType toExtArgumentType(
461  TExtArgumentType type) {
462  switch (type) {
463  case Int8:
464  return ExtensionFunction.ExtArgumentType.Int8;
465  case Int16:
466  return ExtensionFunction.ExtArgumentType.Int16;
467  case Int32:
468  return ExtensionFunction.ExtArgumentType.Int32;
469  case Int64:
470  return ExtensionFunction.ExtArgumentType.Int64;
471  case Float:
472  return ExtensionFunction.ExtArgumentType.Float;
473  case Double:
475  case Void:
476  return ExtensionFunction.ExtArgumentType.Void;
477  case PInt8:
478  return ExtensionFunction.ExtArgumentType.PInt8;
479  case PInt16:
480  return ExtensionFunction.ExtArgumentType.PInt16;
481  case PInt32:
482  return ExtensionFunction.ExtArgumentType.PInt32;
483  case PInt64:
484  return ExtensionFunction.ExtArgumentType.PInt64;
485  case PFloat:
486  return ExtensionFunction.ExtArgumentType.PFloat;
487  case PDouble:
488  return ExtensionFunction.ExtArgumentType.PDouble;
489  case PBool:
490  return ExtensionFunction.ExtArgumentType.PBool;
491  case Bool:
492  return ExtensionFunction.ExtArgumentType.Bool;
493  case ArrayInt8:
494  return ExtensionFunction.ExtArgumentType.ArrayInt8;
495  case ArrayInt16:
496  return ExtensionFunction.ExtArgumentType.ArrayInt16;
497  case ArrayInt32:
498  return ExtensionFunction.ExtArgumentType.ArrayInt32;
499  case ArrayInt64:
500  return ExtensionFunction.ExtArgumentType.ArrayInt64;
501  case ArrayFloat:
502  return ExtensionFunction.ExtArgumentType.ArrayFloat;
503  case ArrayDouble:
504  return ExtensionFunction.ExtArgumentType.ArrayDouble;
505  case ArrayBool:
506  return ExtensionFunction.ExtArgumentType.ArrayBool;
507  case ColumnInt8:
508  return ExtensionFunction.ExtArgumentType.ColumnInt8;
509  case ColumnInt16:
510  return ExtensionFunction.ExtArgumentType.ColumnInt16;
511  case ColumnInt32:
512  return ExtensionFunction.ExtArgumentType.ColumnInt32;
513  case ColumnInt64:
514  return ExtensionFunction.ExtArgumentType.ColumnInt64;
515  case ColumnFloat:
516  return ExtensionFunction.ExtArgumentType.ColumnFloat;
517  case ColumnDouble:
518  return ExtensionFunction.ExtArgumentType.ColumnDouble;
519  case ColumnBool:
520  return ExtensionFunction.ExtArgumentType.ColumnBool;
522  return ExtensionFunction.ExtArgumentType.ColumnTextEncodingDict;
523  case GeoPoint:
524  return ExtensionFunction.ExtArgumentType.GeoPoint;
525  case GeoLineString:
526  return ExtensionFunction.ExtArgumentType.GeoLineString;
527  case Cursor:
528  return ExtensionFunction.ExtArgumentType.Cursor;
529  case GeoPolygon:
530  return ExtensionFunction.ExtArgumentType.GeoPolygon;
531  case GeoMultiPolygon:
532  return ExtensionFunction.ExtArgumentType.GeoMultiPolygon;
533  case TextEncodingNone:
534  return ExtensionFunction.ExtArgumentType.TextEncodingNone;
535  case TextEncodingDict:
536  return ExtensionFunction.ExtArgumentType.TextEncodingDict;
537  case ColumnListInt8:
538  return ExtensionFunction.ExtArgumentType.ColumnListInt8;
539  case ColumnListInt16:
540  return ExtensionFunction.ExtArgumentType.ColumnListInt16;
541  case ColumnListInt32:
542  return ExtensionFunction.ExtArgumentType.ColumnListInt32;
543  case ColumnListInt64:
544  return ExtensionFunction.ExtArgumentType.ColumnListInt64;
545  case ColumnListFloat:
546  return ExtensionFunction.ExtArgumentType.ColumnListFloat;
547  case ColumnListDouble:
548  return ExtensionFunction.ExtArgumentType.ColumnListDouble;
549  case ColumnListBool:
550  return ExtensionFunction.ExtArgumentType.ColumnListBool;
552  return ExtensionFunction.ExtArgumentType.ColumnListTextEncodingDict;
553  default:
554  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
555  return null;
556  }
557  }
558 
559  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
560  switch (type) {
561  case COLUMN:
563  case TABLE:
564  return TCompletionHintType.TABLE;
565  case VIEW:
567  case SCHEMA:
569  case CATALOG:
570  return TCompletionHintType.CATALOG;
571  case REPOSITORY:
572  return TCompletionHintType.REPOSITORY;
573  case FUNCTION:
574  return TCompletionHintType.FUNCTION;
575  case KEYWORD:
576  return TCompletionHintType.KEYWORD;
577  default:
578  return null;
579  }
580  }
581 }
std::string toString(const ExtArgumentType &sig_type)
Simplified core of GeoJSON Polygon coordinates definition.
Definition: OmniSciTypes.h:149
string name
Definition: setup.in.py:72
TPlanResult process(String user, String session, String catalog, String queryText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize, TRestriction restriction)
#define VIEW
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs, boolean isruntime)
Simplified core of GeoJSON MultiPolygon coordinates definition.
Definition: OmniSciTypes.h:170
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
#define COLUMN
Map< String, ExtensionFunction > udfRTSigs
#define SCHEMA
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
#define TABLE
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)