OmniSciDB  8a228a1076
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
19 
24 import com.omnisci.thrift.calciteserver.CalciteServer;
25 import com.omnisci.thrift.calciteserver.InvalidParseRequest;
26 import com.omnisci.thrift.calciteserver.TAccessedQueryObjects;
27 import com.omnisci.thrift.calciteserver.TCompletionHint;
28 import com.omnisci.thrift.calciteserver.TCompletionHintType;
29 import com.omnisci.thrift.calciteserver.TExtArgumentType;
30 import com.omnisci.thrift.calciteserver.TFilterPushDownInfo;
31 import com.omnisci.thrift.calciteserver.TPlanResult;
32 import com.omnisci.thrift.calciteserver.TUserDefinedFunction;
33 import com.omnisci.thrift.calciteserver.TUserDefinedTableFunction;
34 
37 import org.apache.calcite.runtime.CalciteContextException;
38 import org.apache.calcite.sql.SqlNode;
39 import org.apache.calcite.sql.parser.SqlParseException;
40 import org.apache.calcite.sql.validate.SqlMoniker;
41 import org.apache.calcite.sql.validate.SqlMonikerType;
42 import org.apache.calcite.tools.RelConversionException;
43 import org.apache.calcite.tools.ValidationException;
44 import org.apache.calcite.util.Pair;
45 import org.apache.commons.pool.PoolableObjectFactory;
46 import org.apache.commons.pool.impl.GenericObjectPool;
47 import org.apache.thrift.TException;
48 import org.apache.thrift.server.TServer;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 
52 import java.io.IOException;
53 import java.util.ArrayList;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 
62 public class CalciteServerHandler implements CalciteServer.Iface {
63  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
64  private TServer server;
65 
66  private final int mapdPort;
67 
68  private volatile long callCount;
69 
70  private final GenericObjectPool parserPool;
71 
73 
74  private final String extSigsJson;
75 
76  private final String udfSigsJson;
77 
78  private String udfRTSigsJson = "";
79  Map<String, ExtensionFunction> udfRTSigs = null;
80 
82  private Map<String, ExtensionFunction> extSigs = null;
83  private String dataDir;
84 
85  // TODO MAT we need to merge this into common code base for these functions with
86  // CalciteDirect since we are not deprecating this stuff yet
87  public CalciteServerHandler(int mapdPort,
88  String dataDir,
89  String extensionFunctionsAstFile,
91  String udfAstFile) {
92  this.mapdPort = mapdPort;
93  this.dataDir = dataDir;
94 
95  Map<String, ExtensionFunction> udfSigs = null;
96 
97  try {
98  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
99  } catch (IOException ex) {
100  MAPDLOGGER.error(
101  "Could not load extension function signatures: " + ex.getMessage(), ex);
102  }
103  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
104 
105  try {
106  if (!udfAstFile.isEmpty()) {
107  udfSigs = ExtensionFunctionSignatureParser.parseUdfAst(udfAstFile);
108  }
109  } catch (IOException ex) {
110  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage(), ex);
111  }
112  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
113 
114  // Put all the udf functions signatures in extSigs so Calcite has a view of
115  // extension functions and udf functions
116  if (!udfAstFile.isEmpty()) {
117  extSigs.putAll(udfSigs);
118  }
119 
120  calciteParserFactory = new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
121 
122  // GenericObjectPool::setFactory is deprecated
123  this.parserPool = new GenericObjectPool(calciteParserFactory);
124  }
125 
126  @Override
127  public void ping() throws TException {
128  MAPDLOGGER.debug("Ping hit");
129  }
130 
131  @Override
132  public TPlanResult process(String user,
133  String session,
134  String catalog,
135  String sqlText,
136  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
137  boolean legacySyntax,
138  boolean isExplain,
139  boolean isViewOptimize) throws InvalidParseRequest, TException {
140  long timer = System.currentTimeMillis();
141  callCount++;
142 
144  try {
145  parser = (MapDParser) parserPool.borrowObject();
146  parser.clearMemo();
147  } catch (Exception ex) {
148  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
149  MAPDLOGGER.error(msg, ex);
150  throw new InvalidParseRequest(-1, msg);
151  }
152  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
153  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
154  + " sql: " + sqlText);
155  parser.setUser(mapDUser);
156  CURRENT_PARSER.set(parser);
157 
158  // need to trim the sql string as it seems it is not trimed prior to here
159  sqlText = sqlText.trim();
160  // remove last charcter if it is a ;
161  if (sqlText.length() > 0 && sqlText.charAt(sqlText.length() - 1) == ';') {
162  sqlText = sqlText.substring(0, sqlText.length() - 1);
163  }
164  String jsonResult;
165  SqlIdentifierCapturer capturer;
166  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
167  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
168  try {
169  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
170  new ArrayList<>();
171  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
172  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
173  req.input_prev, req.input_start, req.input_next));
174  }
175  Pair<String, SqlIdentifierCapturer> res;
176  SqlNode node;
177  try {
178  MapDParserOptions parserOptions = new MapDParserOptions(
179  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
180  res = parser.process(sqlText, parserOptions);
181  jsonResult = res.left;
182  } catch (ValidationException ex) {
183  String msg = "Validation: " + ex.getMessage();
184  MAPDLOGGER.error(msg, ex);
185  throw ex;
186  } catch (RelConversionException ex) {
187  String msg = " RelConversion failed: " + ex.getMessage();
188  MAPDLOGGER.error(msg, ex);
189  throw ex;
190  }
191  capturer = res.right;
192 
193  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
194  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
195  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
196  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
197 
198  // also resolve all the views in the select part
199  // resolution of the other parts is not
200  // necessary as these cannot be views
201  resolvedAccessedObjects.tables_selected_from =
202  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
203  resolvedAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
204  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
205  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
206 
207  } catch (SqlParseException ex) {
208  String msg = "Parse failed: " + ex.getMessage();
209  MAPDLOGGER.error(msg, ex);
210  throw new InvalidParseRequest(-2, msg);
211  } catch (CalciteContextException ex) {
212  String msg = "Validate failed: " + ex.getMessage();
213  MAPDLOGGER.error(msg, ex);
214  throw new InvalidParseRequest(-3, msg);
215  } catch (Throwable ex) {
216  String msg = "Exception occurred: " + ex.getMessage();
217  MAPDLOGGER.error(msg, ex);
218  throw new InvalidParseRequest(-4, msg);
219  } finally {
220  CURRENT_PARSER.set(null);
221  try {
222  // put parser object back in pool for others to use
223  parserPool.returnObject(parser);
224  } catch (Exception ex) {
225  String msg = "Could not return parse object: " + ex.getMessage();
226  MAPDLOGGER.error(msg, ex);
227  throw new InvalidParseRequest(-4, msg);
228  }
229  }
230 
231  TPlanResult result = new TPlanResult();
232  result.primary_accessed_objects = primaryAccessedObjects;
233  result.resolved_accessed_objects = resolvedAccessedObjects;
234  result.plan_result = jsonResult;
235  result.execution_time_ms = System.currentTimeMillis() - timer;
236 
237  return result;
238  }
239 
240  @Override
241  public void shutdown() throws TException {
242  // received request to shutdown
243  MAPDLOGGER.debug("Shutdown calcite java server");
244  server.stop();
245  }
246 
247  @Override
249  return this.extSigsJson;
250  }
251 
252  @Override
254  return this.udfSigsJson;
255  }
256 
257  @Override
259  return this.udfRTSigsJson;
260  }
261 
262  void setServer(TServer s) {
263  server = s;
264  }
265 
266  @Override
267  public void updateMetadata(String catalog, String table) throws TException {
268  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
269  long timer = System.currentTimeMillis();
270  callCount++;
272  try {
273  parser = (MapDParser) parserPool.borrowObject();
274  } catch (Exception ex) {
275  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
276  MAPDLOGGER.error(msg, ex);
277  return;
278  }
279  CURRENT_PARSER.set(parser);
280  try {
281  parser.updateMetaData(catalog, table);
282  } finally {
283  CURRENT_PARSER.set(null);
284  try {
285  // put parser object back in pool for others to use
286  MAPDLOGGER.debug("Returning object to pool");
287  parserPool.returnObject(parser);
288  } catch (Exception ex) {
289  String msg = "Could not return parse object: " + ex.getMessage();
290  MAPDLOGGER.error(msg, ex);
291  }
292  }
293  }
294 
295  @Override
296  public List<TCompletionHint> getCompletionHints(String user,
297  String session,
298  String catalog,
299  List<String> visible_tables,
300  String sql,
301  int cursor) throws TException {
302  callCount++;
304  try {
305  parser = (MapDParser) parserPool.borrowObject();
306  } catch (Exception ex) {
307  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
308  MAPDLOGGER.error(msg, ex);
309  throw new TException(msg);
310  }
311  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
312  MAPDLOGGER.debug("getCompletionHints was called User: " + user
313  + " Catalog: " + catalog + " sql: " + sql);
314  parser.setUser(mapDUser);
315  CURRENT_PARSER.set(parser);
316 
317  MapDPlanner.CompletionResult completion_result;
318  try {
319  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
320  } catch (Exception ex) {
321  String msg = "Could not retrieve completion hints: " + ex.getMessage();
322  MAPDLOGGER.error(msg, ex);
323  return new ArrayList<>();
324  } finally {
325  CURRENT_PARSER.set(null);
326  try {
327  // put parser object back in pool for others to use
328  parserPool.returnObject(parser);
329  } catch (Exception ex) {
330  String msg = "Could not return parse object: " + ex.getMessage();
331  MAPDLOGGER.error(msg, ex);
332  throw new InvalidParseRequest(-4, msg);
333  }
334  }
335  List<TCompletionHint> result = new ArrayList<>();
336  for (final SqlMoniker hint : completion_result.hints) {
337  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
338  hint.getFullyQualifiedNames(),
339  completion_result.replaced));
340  }
341  return result;
342  }
343 
344  @Override
346  List<TUserDefinedFunction> udfs, List<TUserDefinedTableFunction> udtfs) {
347  // Clean up previously defined Runtime UDFs
348  if (udfRTSigs != null) {
349  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
350  udfRTSigsJson = "";
351  udfRTSigs.clear();
352  } else {
353  udfRTSigs = new HashMap<String, ExtensionFunction>();
354  }
355 
356  for (TUserDefinedFunction udf : udfs) {
357  udfRTSigs.put(udf.name, toExtensionFunction(udf));
358  }
359 
360  for (TUserDefinedTableFunction udtf : udtfs) {
361  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
362  }
363 
364  // Avoid overwritting compiled and Loadtime UDFs:
365  for (String name : udfRTSigs.keySet()) {
366  if (extSigs.containsKey(name)) {
367  MAPDLOGGER.error("Extension function `" + name
368  + "` exists. Skipping runtime extenension function with the same name.");
369  udfRTSigs.remove(name);
370  }
371  }
372 
373  // udfRTSigsJson will contain only the signatures of UDFs:
374  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
375  // Expose RT UDFs to Calcite server:
376  extSigs.putAll(udfRTSigs);
377 
378  calciteParserFactory.updateOperatorTable();
379  }
380 
381  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
383  new ArrayList<ExtensionFunction.ExtArgumentType>();
384  for (TExtArgumentType atype : udf.argTypes) {
385  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
386  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
387  args.add(arg_type);
388  }
389  }
390  return new ExtensionFunction(args, toExtArgumentType(udf.retType));
391  }
392 
393  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
395  new ArrayList<ExtensionFunction.ExtArgumentType>();
396  for (TExtArgumentType atype : udtf.sqlArgTypes) {
397  args.add(toExtArgumentType(atype));
398  }
400  new ArrayList<ExtensionFunction.ExtArgumentType>();
401  for (TExtArgumentType otype : udtf.outputArgTypes) {
402  outs.add(toExtArgumentType(otype));
403  }
404  return new ExtensionFunction(args, outs);
405  }
406 
408  TExtArgumentType type) {
409  switch (type) {
410  case Int8:
412  case Int16:
414  case Int32:
416  case Int64:
418  case Float:
420  case Double:
422  case Void:
424  case PInt8:
426  case PInt16:
428  case PInt32:
430  case PInt64:
432  case PFloat:
434  case PDouble:
436  case PBool:
438  case Bool:
440  case ArrayInt8:
442  case ArrayInt16:
444  case ArrayInt32:
446  case ArrayInt64:
448  case ArrayFloat:
450  case ArrayDouble:
452  case ArrayBool:
454  case ColumnInt8:
456  case ColumnInt16:
458  case ColumnInt32:
460  case ColumnInt64:
462  case ColumnFloat:
464  case ColumnDouble:
466  case ColumnBool:
468  case GeoPoint:
470  case GeoLineString:
472  case Cursor:
474  case GeoPolygon:
476  case GeoMultiPolygon:
478  default:
479  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
480  return null;
481  }
482  }
483 
484  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
485  switch (type) {
486  case COLUMN:
487  return TCompletionHintType.COLUMN;
488  case TABLE:
489  return TCompletionHintType.TABLE;
490  case VIEW:
491  return TCompletionHintType.VIEW;
492  case SCHEMA:
493  return TCompletionHintType.SCHEMA;
494  case CATALOG:
495  return TCompletionHintType.CATALOG;
496  case REPOSITORY:
497  return TCompletionHintType.REPOSITORY;
498  case FUNCTION:
499  return TCompletionHintType.FUNCTION;
500  case KEYWORD:
501  return TCompletionHintType.KEYWORD;
502  default:
503  return null;
504  }
505  }
506 }
Pair< String, SqlIdentifierCapturer > process(String sql, final MapDParserOptions parserOptions)
void updateMetaData(String schema, String table)
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
static Map< String, ExtensionFunction > parse(final String file_path)
Set< String > resolveSelectIdentifiers(SqlIdentifierCapturer capturer)
Map< String, ExtensionFunction > udfRTSigs
void setUser(MapDUser mapdUser)
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
static String signaturesToJson(final Map< String, ExtensionFunction > sigs)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static final ThreadLocal< MapDParser > CURRENT_PARSER
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
static Map< String, ExtensionFunction > parseUdfAst(final String file_path)
MapDPlanner.CompletionResult getCompletionHints(String sql, int cursor, List< String > visible_tables)
TPlanResult process(String user, String session, String catalog, String sqlText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize)
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)