OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CalciteServerHandler.java
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.mapd.parser.server;
17 
18 import static com.mapd.calcite.parser.MapDParser.CURRENT_PARSER;
19 
24 import com.mapd.thrift.calciteserver.CalciteServer;
25 import com.mapd.thrift.calciteserver.InvalidParseRequest;
26 import com.mapd.thrift.calciteserver.TAccessedQueryObjects;
27 import com.mapd.thrift.calciteserver.TCompletionHint;
28 import com.mapd.thrift.calciteserver.TCompletionHintType;
29 import com.mapd.thrift.calciteserver.TExtArgumentType;
30 import com.mapd.thrift.calciteserver.TFilterPushDownInfo;
31 import com.mapd.thrift.calciteserver.TPlanResult;
32 import com.mapd.thrift.calciteserver.TUserDefinedFunction;
33 import com.mapd.thrift.calciteserver.TUserDefinedTableFunction;
34 
37 import org.apache.calcite.runtime.CalciteContextException;
38 import org.apache.calcite.sql.parser.SqlParseException;
39 import org.apache.calcite.sql.validate.SqlMoniker;
40 import org.apache.calcite.sql.validate.SqlMonikerType;
41 import org.apache.calcite.tools.RelConversionException;
42 import org.apache.calcite.tools.ValidationException;
43 import org.apache.commons.pool.PoolableObjectFactory;
44 import org.apache.commons.pool.impl.GenericObjectPool;
45 import org.apache.thrift.TException;
46 import org.apache.thrift.server.TServer;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 
50 import java.io.IOException;
51 import java.util.ArrayList;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 
60 class CalciteServerHandler implements CalciteServer.Iface {
61  final static Logger MAPDLOGGER = LoggerFactory.getLogger(CalciteServerHandler.class);
62  private TServer server;
63 
64  private final int mapdPort;
65 
66  private volatile long callCount;
67 
68  private final GenericObjectPool parserPool;
69 
70  private final String extSigsJson;
71 
72  private final String udfSigsJson;
73 
74  private String udfRTSigsJson = "";
75  Map<String, ExtensionFunction> udfRTSigs = null;
76 
78  private Map<String, ExtensionFunction> extSigs = null;
79  private String dataDir;
80 
81  // TODO MAT we need to merge this into common code base for these functions with
82  // CalciteDirect since we are not deprecating this stuff yet
85  String extensionFunctionsAstFile,
87  String udfAstFile) {
88  this.mapdPort = mapdPort;
89  this.dataDir = dataDir;
90 
91  Map<String, ExtensionFunction> udfSigs = null;
92 
93  try {
94  extSigs = ExtensionFunctionSignatureParser.parse(extensionFunctionsAstFile);
95  } catch (IOException ex) {
96  MAPDLOGGER.error(
97  "Could not load extension function signatures: " + ex.getMessage());
98  }
99  extSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(extSigs);
100 
101  try {
102  if (!udfAstFile.isEmpty()) {
103  udfSigs = ExtensionFunctionSignatureParser.parse(udfAstFile);
104  }
105  } catch (IOException ex) {
106  MAPDLOGGER.error("Could not load udf function signatures: " + ex.getMessage());
107  }
108  udfSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfSigs);
109 
110  // Put all the udf functions signatures in extSigs so Calcite has a view of
111  // extension functions and udf functions
112  if (!udfAstFile.isEmpty()) {
113  extSigs.putAll(udfSigs);
114  }
115 
116  PoolableObjectFactory parserFactory =
117  new CalciteParserFactory(dataDir, extSigs, mapdPort, skT);
118  // GenericObjectPool::setFactory is deprecated
119  this.parserPool = new GenericObjectPool(parserFactory);
120  }
121 
122  @Override
123  public void ping() throws TException {
124  MAPDLOGGER.debug("Ping hit");
125  }
126 
127  @Override
128  public TPlanResult process(String user,
129  String session,
130  String catalog,
131  String sqlText,
132  java.util.List<TFilterPushDownInfo> thriftFilterPushDownInfo,
133  boolean legacySyntax,
134  boolean isExplain,
135  boolean isViewOptimize) throws InvalidParseRequest, TException {
136  long timer = System.currentTimeMillis();
137  callCount++;
139  try {
140  parser = (MapDParser) parserPool.borrowObject();
141  } catch (Exception ex) {
142  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
143  MAPDLOGGER.error(msg);
144  throw new InvalidParseRequest(-1, msg);
145  }
146  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
147  MAPDLOGGER.debug("process was called User: " + user + " Catalog: " + catalog
148  + " sql: " + sqlText);
149  parser.setUser(mapDUser);
150  CURRENT_PARSER.set(parser);
151 
152  // need to trim the sql string as it seems it is not trimed prior to here
153  sqlText = sqlText.trim();
154  // remove last charcter if it is a ;
155  if (sqlText.length() > 0 && sqlText.charAt(sqlText.length() - 1) == ';') {
156  sqlText = sqlText.substring(0, sqlText.length() - 1);
157  }
158  String relAlgebra;
159  SqlIdentifierCapturer capturer;
160  TAccessedQueryObjects primaryAccessedObjects = new TAccessedQueryObjects();
161  TAccessedQueryObjects resolvedAccessedObjects = new TAccessedQueryObjects();
162  try {
163  final List<MapDParserOptions.FilterPushDownInfo> filterPushDownInfo =
164  new ArrayList<>();
165  for (final TFilterPushDownInfo req : thriftFilterPushDownInfo) {
166  filterPushDownInfo.add(new MapDParserOptions.FilterPushDownInfo(
167  req.input_prev, req.input_start, req.input_next));
168  }
169  try {
170  MapDParserOptions parserOptions = new MapDParserOptions(
171  filterPushDownInfo, legacySyntax, isExplain, isViewOptimize);
172  relAlgebra = parser.getRelAlgebra(sqlText, parserOptions, mapDUser);
173  } catch (ValidationException ex) {
174  String msg = "Validation: " + ex.getMessage();
175  MAPDLOGGER.error(msg);
176  throw ex;
177  } catch (RelConversionException ex) {
178  String msg = " RelConversion failed: " + ex.getMessage();
179  MAPDLOGGER.error(msg);
180  throw ex;
181  }
182  capturer = parser.captureIdentifiers(sqlText, legacySyntax);
183 
184  primaryAccessedObjects.tables_selected_from = new ArrayList<>(capturer.selects);
185  primaryAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
186  primaryAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
187  primaryAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
188 
189  // also resolve all the views in the select part
190  // resolution of the other parts is not
191  // necessary as these cannot be views
192  resolvedAccessedObjects.tables_selected_from =
193  new ArrayList<>(parser.resolveSelectIdentifiers(capturer));
194  resolvedAccessedObjects.tables_inserted_into = new ArrayList<>(capturer.inserts);
195  resolvedAccessedObjects.tables_updated_in = new ArrayList<>(capturer.updates);
196  resolvedAccessedObjects.tables_deleted_from = new ArrayList<>(capturer.deletes);
197 
198  } catch (SqlParseException ex) {
199  String msg = "Parse failed: " + ex.getMessage();
200  MAPDLOGGER.error(msg);
201  throw new InvalidParseRequest(-2, msg);
202  } catch (CalciteContextException ex) {
203  String msg = "Validate failed: " + ex.getMessage();
204  MAPDLOGGER.error(msg);
205  throw new InvalidParseRequest(-3, msg);
206  } catch (Throwable ex) {
207  String msg = "Exception occurred: " + ex.getMessage();
208  MAPDLOGGER.error(msg);
209  throw new InvalidParseRequest(-4, msg);
210  } finally {
211  CURRENT_PARSER.set(null);
212  try {
213  // put parser object back in pool for others to use
214  parserPool.returnObject(parser);
215  } catch (Exception ex) {
216  String msg = "Could not return parse object: " + ex.getMessage();
217  MAPDLOGGER.error(msg);
218  throw new InvalidParseRequest(-4, msg);
219  }
220  }
221 
222  TPlanResult result = new TPlanResult();
223  result.primary_accessed_objects = primaryAccessedObjects;
224  result.resolved_accessed_objects = resolvedAccessedObjects;
225  result.plan_result = relAlgebra;
226  result.execution_time_ms = System.currentTimeMillis() - timer;
227 
228  return result;
229  }
230 
231  @Override
232  public void shutdown() throws TException {
233  // received request to shutdown
234  MAPDLOGGER.debug("Shutdown calcite java server");
235  server.stop();
236  }
237 
238  @Override
240  return this.extSigsJson;
241  }
242 
243  @Override
245  return this.udfSigsJson;
246  }
247 
248  @Override
250  return this.udfRTSigsJson;
251  }
252 
253  void setServer(TServer s) {
254  server = s;
255  }
256 
257  @Override
258  public void updateMetadata(String catalog, String table) throws TException {
259  MAPDLOGGER.debug("Received invalidation from server for " + catalog + " : " + table);
260  long timer = System.currentTimeMillis();
261  callCount++;
263  try {
264  parser = (MapDParser) parserPool.borrowObject();
265  } catch (Exception ex) {
266  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
267  MAPDLOGGER.error(msg);
268  return;
269  }
270  CURRENT_PARSER.set(parser);
271  try {
272  parser.updateMetaData(catalog, table);
273  } finally {
274  CURRENT_PARSER.set(null);
275  try {
276  // put parser object back in pool for others to use
277  MAPDLOGGER.debug("Returning object to pool");
278  parserPool.returnObject(parser);
279  } catch (Exception ex) {
280  String msg = "Could not return parse object: " + ex.getMessage();
281  MAPDLOGGER.error(msg);
282  }
283  }
284  }
285 
286  @Override
287  public List<TCompletionHint> getCompletionHints(String user,
288  String session,
289  String catalog,
290  List<String> visible_tables,
291  String sql,
292  int cursor) throws TException {
293  callCount++;
295  try {
296  parser = (MapDParser) parserPool.borrowObject();
297  } catch (Exception ex) {
298  String msg = "Could not get Parse Item from pool: " + ex.getMessage();
299  MAPDLOGGER.error(msg);
300  throw new TException(msg);
301  }
302  MapDUser mapDUser = new MapDUser(user, session, catalog, mapdPort);
303  MAPDLOGGER.debug("getCompletionHints was called User: " + user
304  + " Catalog: " + catalog + " sql: " + sql);
305  parser.setUser(mapDUser);
306  CURRENT_PARSER.set(parser);
307 
308  MapDPlanner.CompletionResult completion_result;
309  try {
310  completion_result = parser.getCompletionHints(sql, cursor, visible_tables);
311  } catch (Exception ex) {
312  String msg = "Could not retrieve completion hints: " + ex.getMessage();
313  MAPDLOGGER.error(msg);
314  return new ArrayList<>();
315  } finally {
316  CURRENT_PARSER.set(null);
317  try {
318  // put parser object back in pool for others to use
319  parserPool.returnObject(parser);
320  } catch (Exception ex) {
321  String msg = "Could not return parse object: " + ex.getMessage();
322  MAPDLOGGER.error(msg);
323  throw new InvalidParseRequest(-4, msg);
324  }
325  }
326  List<TCompletionHint> result = new ArrayList<>();
327  for (final SqlMoniker hint : completion_result.hints) {
328  result.add(new TCompletionHint(hintTypeToThrift(hint.getType()),
329  hint.getFullyQualifiedNames(),
330  completion_result.replaced));
331  }
332  return result;
333  }
334 
335  @Override
337  List<TUserDefinedFunction> udfs, List<TUserDefinedTableFunction> udtfs) {
338  // Clean up previously defined Runtime UDFs
339  if (udfRTSigs != null) {
340  for (String name : udfRTSigs.keySet()) extSigs.remove(name);
341  udfRTSigsJson = "";
342  udfRTSigs.clear();
343  } else {
344  udfRTSigs = new HashMap<String, ExtensionFunction>();
345  }
346 
347  for (TUserDefinedFunction udf : udfs) {
348  udfRTSigs.put(udf.name, toExtensionFunction(udf));
349  }
350 
351  for (TUserDefinedTableFunction udtf : udtfs) {
352  udfRTSigs.put(udtf.name, toExtensionFunction(udtf));
353  }
354 
355  // Avoid overwritting compiled and Loadtime UDFs:
356  for (String name : udfRTSigs.keySet()) {
357  if (extSigs.containsKey(name)) {
358  MAPDLOGGER.error("Extension function `" + name
359  + "` exists. Skipping runtime extenension function with the same name.");
360  udfRTSigs.remove(name);
361  }
362  }
363 
364  udfRTSigsJson = ExtensionFunctionSignatureParser.signaturesToJson(udfRTSigs);
365  // Expose RT UDFs to Calcite server:
366  extSigs.putAll(udfRTSigs);
367  }
368 
369  private static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf) {
370  List<ExtensionFunction.ExtArgumentType> args =
371  new ArrayList<ExtensionFunction.ExtArgumentType>();
372  for (TExtArgumentType atype : udf.argTypes) {
373  final ExtensionFunction.ExtArgumentType arg_type = toExtArgumentType(atype);
374  if (arg_type != ExtensionFunction.ExtArgumentType.Void) {
375  args.add(arg_type);
376  }
377  }
378  return new ExtensionFunction(args, toExtArgumentType(udf.retType), true);
379  }
380 
381  private static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf) {
382  List<ExtensionFunction.ExtArgumentType> args =
383  new ArrayList<ExtensionFunction.ExtArgumentType>();
384  for (TExtArgumentType atype : udtf.sqlArgTypes) {
385  args.add(toExtArgumentType(atype));
386  }
387  return new ExtensionFunction(args, ExtensionFunction.ExtArgumentType.Void, false);
388  }
389 
390  private static ExtensionFunction.ExtArgumentType toExtArgumentType(
391  TExtArgumentType type) {
392  switch (type) {
393  case Int8:
394  return ExtensionFunction.ExtArgumentType.Int8;
395  case Int16:
396  return ExtensionFunction.ExtArgumentType.Int16;
397  case Int32:
398  return ExtensionFunction.ExtArgumentType.Int32;
399  case Int64:
400  return ExtensionFunction.ExtArgumentType.Int64;
401  case Float:
402  return ExtensionFunction.ExtArgumentType.Float;
403  case Double:
404  return ExtensionFunction.ExtArgumentType.Double;
405  case Void:
406  return ExtensionFunction.ExtArgumentType.Void;
407  case PInt8:
408  return ExtensionFunction.ExtArgumentType.PInt8;
409  case PInt16:
410  return ExtensionFunction.ExtArgumentType.PInt16;
411  case PInt32:
412  return ExtensionFunction.ExtArgumentType.PInt32;
413  case PInt64:
414  return ExtensionFunction.ExtArgumentType.PInt64;
415  case PFloat:
416  return ExtensionFunction.ExtArgumentType.PFloat;
417  case PDouble:
418  return ExtensionFunction.ExtArgumentType.PDouble;
419  case Bool:
420  return ExtensionFunction.ExtArgumentType.Bool;
421  case ArrayInt8:
422  return ExtensionFunction.ExtArgumentType.ArrayInt8;
423  case ArrayInt16:
424  return ExtensionFunction.ExtArgumentType.ArrayInt16;
425  case ArrayInt32:
426  return ExtensionFunction.ExtArgumentType.ArrayInt32;
427  case ArrayInt64:
428  return ExtensionFunction.ExtArgumentType.ArrayInt64;
429  case ArrayFloat:
430  return ExtensionFunction.ExtArgumentType.ArrayFloat;
431  case ArrayDouble:
432  return ExtensionFunction.ExtArgumentType.ArrayDouble;
433  case GeoPoint:
434  return ExtensionFunction.ExtArgumentType.GeoPoint;
435  case Cursor:
436  return ExtensionFunction.ExtArgumentType.Cursor;
437  default:
438  MAPDLOGGER.error("toExtArgumentType: unknown type " + type);
439  return null;
440  }
441  }
442 
443  private static TCompletionHintType hintTypeToThrift(final SqlMonikerType type) {
444  switch (type) {
445  case COLUMN:
446  return TCompletionHintType.COLUMN;
447  case TABLE:
448  return TCompletionHintType.TABLE;
449  case VIEW:
450  return TCompletionHintType.VIEW;
451  case SCHEMA:
452  return TCompletionHintType.SCHEMA;
453  case CATALOG:
454  return TCompletionHintType.CATALOG;
455  case REPOSITORY:
456  return TCompletionHintType.REPOSITORY;
457  case FUNCTION:
458  return TCompletionHintType.FUNCTION;
459  case KEYWORD:
460  return TCompletionHintType.KEYWORD;
461  default:
462  return null;
463  }
464  }
465 }
static ExtensionFunction.ExtArgumentType toExtArgumentType(TExtArgumentType type)
void updateMetadata(String catalog, String table)
Map< String, ExtensionFunction > udfRTSigs
static TCompletionHintType hintTypeToThrift(final SqlMonikerType type)
static ExtensionFunction toExtensionFunction(TUserDefinedTableFunction udtf)
static ExtensionFunction toExtensionFunction(TUserDefinedFunction udf)
TPlanResult process(String user, String session, String catalog, String sqlText, java.util.List< TFilterPushDownInfo > thriftFilterPushDownInfo, boolean legacySyntax, boolean isExplain, boolean isViewOptimize)
CalciteServerHandler(int mapdPort, String dataDir, String extensionFunctionsAstFile, SockTransportProperties skT, String udfAstFile)
void setRuntimeExtensionFunctions(List< TUserDefinedFunction > udfs, List< TUserDefinedTableFunction > udtfs)
List< TCompletionHint > getCompletionHints(String user, String session, String catalog, List< String > visible_tables, String sql, int cursor)