drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From laurentgo <...@git.apache.org>
Subject [GitHub] drill pull request #613: DRILL-4730: Update JDBC DatabaseMetaData implementa...
Date Wed, 22 Feb 2017 01:26:12 GMT
Github user laurentgo commented on a diff in the pull request:

    https://github.com/apache/drill/pull/613#discussion_r102363803
  
    --- Diff: exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java ---
    @@ -24,33 +24,260 @@
     import java.util.ArrayList;
     import java.util.Calendar;
     import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
     import org.apache.calcite.avatica.AvaticaResultSet;
    +import org.apache.calcite.avatica.AvaticaStatement;
     import org.apache.calcite.avatica.ColumnMetaData;
    +import org.apache.calcite.avatica.Meta;
    +import org.apache.calcite.avatica.Meta.Signature;
     import org.apache.calcite.avatica.util.ArrayImpl.Factory;
     import org.apache.calcite.avatica.util.Cursor;
     import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.client.DrillClient;
     import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.proto.UserBitShared.QueryType;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
     import org.apache.drill.exec.record.BatchSchema;
     import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.rpc.ConnectionThrottle;
     import org.apache.drill.exec.rpc.user.QueryDataBatch;
    +import org.apache.drill.exec.rpc.user.UserResultsListener;
     import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
    +import org.apache.drill.jdbc.SchemaChangeListener;
     import org.slf4j.Logger;
     
    +import com.google.common.collect.Queues;
    +
     
     class DrillCursor implements Cursor {
    +
    +  ////////////////////////////////////////
    +  // ResultsListener:
    +  static class ResultsListener implements UserResultsListener {
    +    private static final org.slf4j.Logger logger =
    +        org.slf4j.LoggerFactory.getLogger(ResultsListener.class);
    +
    +    private static volatile int nextInstanceId = 1;
    +
    +    /** (Just for logging.) */
    +    private final int instanceId;
    +
    +    private final int batchQueueThrottlingThreshold;
    +
    +    /** (Just for logging.) */
    +    private volatile QueryId queryId;
    +
    +    /** (Just for logging.) */
    +    private int lastReceivedBatchNumber;
    +    /** (Just for logging.) */
    +    private int lastDequeuedBatchNumber;
    +
    +    private volatile UserException executionFailureException;
    +
    +    // TODO:  Revisit "completed".  Determine and document exactly what it
    +    // means.  Some uses imply that it means that incoming messages indicate
    +    // that the _query_ has _terminated_ (not necessarily _completing_
    +    // normally), while some uses imply that it's some other state of the
    +    // ResultListener.  Some uses seem redundant.)
    +    volatile boolean completed = false;
    +
    +    /** Whether throttling of incoming data is active. */
    +    private final AtomicBoolean throttled = new AtomicBoolean( false );
    +    private volatile ConnectionThrottle throttle;
    +
    +    private volatile boolean closed = false;
    +
    +    private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
    +
    +    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
    +        Queues.newLinkedBlockingDeque();
    +
    +
    +    /**
    +     * ...
    +     * @param  batchQueueThrottlingThreshold
    +     *         queue size threshold for throttling server
    +     */
    +    ResultsListener( int batchQueueThrottlingThreshold ) {
    +      instanceId = nextInstanceId++;
    +      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
    +      logger.debug( "[#{}] Query listener created.", instanceId );
    +    }
    +
    +    /**
    +     * Starts throttling if not currently throttling.
    +     * @param  throttle  the "throttlable" object to throttle
    +     * @return  true if actually started (wasn't throttling already)
    +     */
    +    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
    +      final boolean started = throttled.compareAndSet( false, true );
    +      if ( started ) {
    +        this.throttle = throttle;
    +        throttle.setAutoRead(false);
    +      }
    +      return started;
    +    }
    +
    +    /**
    +     * Stops throttling if currently throttling.
    +     * @return  true if actually stopped (was throttling)
    +     */
    +    private boolean stopThrottlingIfSo() {
    +      final boolean stopped = throttled.compareAndSet( true, false );
    +      if ( stopped ) {
    +        throttle.setAutoRead(true);
    +        throttle = null;
    +      }
    +      return stopped;
    +    }
    +
    +    public void awaitFirstMessage() throws InterruptedException {
    +      firstMessageReceived.await();
    +    }
    +
    +    private void releaseIfFirst() {
    --- End diff --
    
    unfortunately, github doesn't show the complete change, but if the question is about DrillCursor
refactoring, the answer is yes! Since metadata queries resultset are plain avatica resultsets
(and not using drillcursor at all), having some of the drillcursor business logic in DrillResultSetImpl
caused some issues during refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message