drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4729) Add support for prepared statement implementation on server side
Date Mon, 08 Aug 2016 20:11:20 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412374#comment-15412374
] 

ASF GitHub Bot commented on DRILL-4729:
---------------------------------------

Github user vkorukanti commented on a diff in the pull request:

    https://github.com/apache/drill/pull/530#discussion_r73944776
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
---
    @@ -0,0 +1,427 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.prepare;
    +
    +import static org.apache.drill.exec.ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.FAILED;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.OK;
    +import static org.apache.drill.exec.proto.UserProtos.RequestStatus.TIMEOUT;
    +
    +import org.apache.drill.common.exceptions.ErrorHelper;
    +import org.apache.drill.common.types.TypeProtos.DataMode;
    +import org.apache.drill.common.types.TypeProtos.MajorType;
    +import org.apache.drill.common.types.TypeProtos.MinorType;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
    +import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
    +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    +import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
    +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.proto.UserBitShared.QueryType;
    +import org.apache.drill.exec.proto.UserBitShared.SerializedField;
    +import org.apache.drill.exec.proto.UserProtos.ColumnSearchability;
    +import org.apache.drill.exec.proto.UserProtos.ColumnUpdatability;
    +import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
    +import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
    +import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
    +import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
    +import org.apache.drill.exec.proto.UserProtos.RequestStatus;
    +import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
    +import org.apache.drill.exec.proto.UserProtos.RpcType;
    +import org.apache.drill.exec.proto.UserProtos.RunQuery;
    +import org.apache.drill.exec.rpc.Acks;
    +import org.apache.drill.exec.rpc.Response;
    +import org.apache.drill.exec.rpc.ResponseSender;
    +import org.apache.drill.exec.rpc.RpcOutcomeListener;
    +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
    +import org.apache.drill.exec.rpc.user.UserSession;
    +import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
    +import org.apache.drill.exec.work.user.UserWorker;
    +import org.joda.time.Period;
    +
    +import com.google.common.collect.ImmutableMap;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.ChannelFuture;
    +import java.math.BigDecimal;
    +import java.net.SocketAddress;
    +import java.sql.Date;
    +import java.sql.ResultSetMetaData;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Contains worker {@link Runnable} for creating a prepared statement and helper methods.
    + */
    +public class PreparedStatementProvider {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
    +
    +  /**
    +   * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to
be returned through
    +   * {@link ResultSetMetaData#getColumnClassName(int)}.
    +   */
    +  private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType,
String>builder()
    +      .put(MinorType.INT, Integer.class.getName())
    +      .put(MinorType.BIGINT, Long.class.getName())
    +      .put(MinorType.FLOAT4, Float.class.getName())
    +      .put(MinorType.FLOAT8, Double.class.getName())
    +      .put(MinorType.VARCHAR, String.class.getName())
    +      .put(MinorType.BIT, Boolean.class.getName())
    +      .put(MinorType.DATE, Date.class.getName())
    +      .put(MinorType.DECIMAL9, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL18, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL28SPARSE, BigDecimal.class.getName())
    +      .put(MinorType.DECIMAL38SPARSE, BigDecimal.class.getName())
    +      .put(MinorType.TIME, Time.class.getName())
    +      .put(MinorType.TIMESTAMP, Timestamp.class.getName())
    +      .put(MinorType.VARBINARY, byte[].class.getName())
    +      .put(MinorType.INTERVALYEAR, Period.class.getName())
    +      .put(MinorType.INTERVALDAY, Period.class.getName())
    +      .put(MinorType.MAP, Object.class.getName())
    +      .put(MinorType.LIST, Object.class.getName())
    +      .put(MinorType.UNION, Object.class.getName())
    +      .build();
    +
    +  /**
    +   * Runnable that creates a prepared statement for given {@link CreatePreparedStatementReq}
and
    +   * sends the response at the end.
    +   */
    +  public static class PreparedStatementWorker implements Runnable {
    +    private final UserClientConnection connection;
    +    private final UserWorker userWorker;
    +    private final ResponseSender responseSender;
    +    private final CreatePreparedStatementReq req;
    +
    +    public PreparedStatementWorker(final UserClientConnection connection, final UserWorker
userWorker,
    +        final ResponseSender responseSender, final CreatePreparedStatementReq req) {
    +      this.connection = connection;
    +      this.userWorker = userWorker;
    +      this.responseSender = responseSender;
    +      this.req = req;
    +    }
    +
    +    @Override
    +    public void run() {
    +      final CreatePreparedStatementResp.Builder respBuilder = CreatePreparedStatementResp.newBuilder();
    +      try {
    +        UserClientConnectionWrapper wrapper = new UserClientConnectionWrapper(connection);
    +
    +        final RunQuery limit0Query =
    +            RunQuery.newBuilder()
    +                .setType(QueryType.SQL)
    +                .setPlan(String.format("SELECT * FROM (%s) LIMIT 0", req.getSqlQuery()))
    +                .build();
    +
    +        final QueryId limit0QueryId = userWorker.submitWork(wrapper, limit0Query);
    +
    +        final long timeout_millis =
    +            userWorker.getSystemOptions().getOption(CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS).num_val;
    +
    +        try {
    +          if (!wrapper.await(timeout_millis)) {
    +            logger.error("LIMIT 0 query (QueryId: {}) for prepared statement took longer
than {} ms. Cancelling.",
    +                limit0QueryId, timeout_millis);
    +            userWorker.cancelQuery(limit0QueryId);
    +            final String errorMsg = String.format(
    +                "LIMIT 0 query (QueryId: %s) for prepared statement took longer than
%d ms. " +
    +                    "Query cancellation requested.\n" +
    +                    "Retry after changing the option '%s' to a higher value.",
    +                limit0QueryId, timeout_millis, CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS);
    +            setErrorHelper(respBuilder, TIMEOUT, null, errorMsg, ErrorType.SYSTEM);
    +            return;
    +          }
    +        } catch (InterruptedException ex) {
    +          setErrorHelper(respBuilder, FAILED, ex, "Prepared statement creation interrupted.",
ErrorType.SYSTEM);
    +          return;
    +        }
    +
    +        if (wrapper.getError() != null) {
    +          setErrorHelper(respBuilder, wrapper.getError(), "Failed to get result set schema
for prepare statement.");
    +          return;
    +        }
    +
    +        final PreparedStatement.Builder prepStmtBuilder = PreparedStatement.newBuilder();
    +
    +        for (SerializedField field : wrapper.getFields()) {
    +          prepStmtBuilder.addColumns(serializeColumn(field));
    +        }
    +
    +        prepStmtBuilder.setServerHandle(
    +            PreparedStatementHandle.newBuilder()
    +                .setServerInfo(
    +                    ServerPreparedStatementState.newBuilder()
    +                        .setSqlQuery(req.getSqlQuery())
    +                        .build().toByteString()
    +                )
    +        );
    +
    +        respBuilder.setStatus(OK);
    +        respBuilder.setPreparedStatement(prepStmtBuilder.build());
    +      } catch (Throwable e) {
    +        setErrorHelper(respBuilder, FAILED, e, "Failed to create prepared statement.",
ErrorType.SYSTEM);
    +      } finally {
    +        responseSender.send(new Response(RpcType.PREPARED_STATEMENT, respBuilder.build()));
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code>
    +   */
    +  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder,
final RequestStatus status,
    +      final Throwable ex, final String message, final ErrorType errorType) {
    +    respBuilder.setStatus(status);
    +    final String errorId = UUID.randomUUID().toString();
    +    if (ex != null) {
    +      logger.error("{} ErrorId: {}", message, errorId, ex);
    +    } else {
    +      logger.error("{} ErrorId: {}", message, errorId);
    +    }
    +
    +    final DrillPBError.Builder builder = DrillPBError.newBuilder();
    +    builder.setErrorType(errorType);
    +    builder.setErrorId(errorId);
    +    builder.setMessage(message);
    +
    +    if (ex != null) {
    +      builder.setException(ErrorHelper.getWrapper(ex));
    +    }
    +
    +    respBuilder.setError(builder.build());
    +  }
    +
    +  /**
    +   * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code>
    +   */
    +  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder,
final DrillPBError error,
    +      final String message) {
    +    respBuilder.setStatus(FAILED);
    +    final String errorId = UUID.randomUUID().toString();
    +    logger.error("{} ErrorId: {}", message, errorId);
    +
    +    respBuilder.setError(error);
    +  }
    +
    +  /**
    +   * Decorator around {@link UserClientConnection} to tap the query results for LIMIT
0 query.
    +   */
    +  private static class UserClientConnectionWrapper implements UserClientConnection {
    +    private final UserClientConnection inner;
    +    private final CountDownLatch latch = new CountDownLatch(1);
    +
    +    private DrillPBError error;
    +    private List<SerializedField> fields;
    --- End diff --
    
    thanks for catching this.


> Add support for prepared statement implementation on server side
> ----------------------------------------------------------------
>
>                 Key: DRILL-4729
>                 URL: https://issues.apache.org/jira/browse/DRILL-4729
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Metadata
>            Reporter: Venki Korukanti
>            Assignee: Venki Korukanti
>             Fix For: 1.8.0
>
>
> Currently Drill JDBC/ODBC driver implements its own prepared statement implementation,
which basically issues limit 0 query to get the metadata and then executes the actual query.
So the query is planned twice (for metadata fetch and actual execution). Proposal is to move
that logic to server where we can make optimizations without disrupting/updating the JDBC/ODBC
drivers.
> *  {{PreparedStatement createPreparedStatement(String query)}}. {{PreparedStatement}}
object contains the following:
> ** {{ResultSetMetadata getResultSetMetadata()}}
> *** {{ResultsSetMetadata}} contains methods to fetch info about output columns of the
query. What info these methods provide is given in this [spreadsheet|https://docs.google.com/spreadsheets/d/1A6nqUQo5xJaZDQlDTittpVrK7t4Kylycs3P32Yn_O5k/edit?usp=sharing].
It lists the ODBC/JDBC requirements and what Drill will provided through object {{ResultsSetMetadata}}.
> *** Server can put more info here which is opaque to client and use it in server when
the client sends execute prepared statement query request. 
> Overload the current submit query API to take the {{PreparedStatement}} returned above.

> In the initial implementation, server side implementation of {{createPreparedStatement}}
API is implemented as follows:
> * Runs the query with {{LIMIT 0}}, gets the schema
> * Convert the query into a binary blob and set it as opaque object in {{PreparedStatement}}.
> When the {{PreparedStatement}} is submitted for execution, reconstruct the query from
binary blob in opaque component of {{PreparedStatement}} and execute it from scratch. 
> Opaque component of the {{PreparedStatement}} is where we can save more information which
we can use for optimizations/speedups.
> NOTE: We are not going to worry about parameters in prepared query in initial implementation.
We can provide the functionality later if there is sufficient demand from Drill community.
> Changes in this patch are going to include protobuf messages, server side messages and
Java client APIs. Native client changes are going to be tracked in a separate JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message