drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [5/5] drill git commit: DRILL-1735: Have closing of JDBC connection free embedded-server resources.
Date Fri, 20 Mar 2015 05:15:22 GMT
DRILL-1735: Have closing of JDBC connection free embedded-server resources.

Hooked up closing of JDBC connection to shut down embedded Drillbit, and then
fixed chain of bugs exposed by that:
 1. Added test org.apache.drill.jdbc.test.Bug1735ConnectionCloseTest.
 2. Hooked up connection handler in Driver to actually close JDBC connection.
 3. Released a QueryResultsBatch in DrillCursor.
 4. Reset DrillMetrics in BootStrapContext.close() (so stopping local DrillBit
    and starting new DrillBit doesn't yield "duplicate metric" error.)
 5. Checked cursor/row state before trying to retrieve value in DrillResultSet's
    column accessor methods.
    - Added org.apache.drill.jdbc.JdbcApiSqlException (for JDBC-level errors).
    - Added org.apache.drill.jdbc.InvalidCursorStateSqlException.
    [SqlAccessor, AvaticaDrillSqlAccessor, DrillConnectionImpl, DrillCursor,
    DrillResultSet, InvalidCursorStateSqlException, JdbcApiSqlException,
    DrillResultSetTest]
 6. Released vectors in DrillResultSet.cleanup().
    Added org.apache.drill.jdbc.test.Bug1735ResultSetCloseReleasesBuffersTest.
 7. Delayed last-chunk batch until COMPLETED batch in QueryResultHandler.
 8. Added nextUntilEnd(...) workarounds for fragment cancelation race condition
    to TestView and other JDBC module/subproject tests.
 9. Tracked open statements in orer to close at connection close (DrillStatementRegistry, etc.)
10. Commented out nextUntilEnd(...) workarounds for fragment cancelation race
    condition.
11 Miscellaneous:
   - Added some toString() methods
   - Adjusted some logging (e.g., "// log.debug(...)" -> "log.trace(...)".
   - Cleaned up a bit.  [DrillCursor, DrillResultSet, QueryResultHandler]
   - Added a few documentation comments.
   - Added various TODO comments.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c9ee8c4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c9ee8c4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c9ee8c4

Branch: refs/heads/master
Commit: 9c9ee8c435c19c90636ce770fef9d05b5d3ae12e
Parents: 48c9c01
Author: dbarclay <dbarclay@maprtech.com>
Authored: Tue Dec 16 14:05:11 2014 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Mar 19 22:14:57 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |  31 +-
 .../drill/exec/physical/impl/ScreenCreator.java |  10 +-
 .../drill/exec/rpc/user/QueryResultHandler.java | 274 ++++++++++----
 .../drill/exec/server/BootStrapContext.java     |   4 +-
 .../org/apache/drill/exec/server/Drillbit.java  |  15 +-
 .../drill/exec/vector/accessor/SqlAccessor.java |  11 +
 .../apache/drill/exec/work/foreman/Foreman.java |   3 +-
 .../exec/work/fragment/FragmentExecutor.java    |  57 ++-
 .../java/org/apache/drill/exec/ExecTest.java    |   2 +
 .../drill/jdbc/AvaticaDrillSqlAccessor.java     |  62 ++--
 .../apache/drill/jdbc/DrillAccessorList.java    |   2 +
 .../drill/jdbc/DrillConnectionConfig.java       |   2 +
 .../apache/drill/jdbc/DrillConnectionImpl.java  |  40 +-
 .../java/org/apache/drill/jdbc/DrillCursor.java |  79 +++-
 .../drill/jdbc/DrillPreparedStatement.java      |   6 +-
 .../org/apache/drill/jdbc/DrillResultSet.java   |  38 +-
 .../org/apache/drill/jdbc/DrillStatement.java   |   6 +-
 .../drill/jdbc/DrillStatementRegistry.java      |  57 ++-
 .../main/java/org/apache/drill/jdbc/Driver.java |   4 +-
 .../jdbc/InvalidCursorStateSqlException.java    |  97 +++++
 .../apache/drill/jdbc/JdbcApiSqlException.java  | 155 ++++++++
 .../apache/drill/jdbc/DrillResultSetTest.java   | 159 ++++++++
 .../java/org/apache/drill/jdbc/DriverTest.java  | 371 +++++++++++++++++++
 .../java/org/apache/drill/jdbc/JdbcTest.java    |  21 +-
 .../jdbc/test/Bug1735ConnectionCloseTest.java   | 102 +++++
 ...ug1735ResultSetCloseReleasesBuffersTest.java |  89 +++++
 .../drill/jdbc/test/TestJdbcDistQuery.java      |  23 +-
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |  15 +
 .../apache/drill/jdbc/test/TestMetadataDDL.java |  41 +-
 .../org/apache/drill/jdbc/test/TestViews.java   | 127 +++++--
 30 files changed, 1680 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index c3a873c..6d4c86c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
@@ -61,9 +62,10 @@ import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
+ * Thin wrapper around a UserClient that handles connect/close and transforms
+ * String into ByteBuf.
  */
-public class DrillClient implements Closeable, ConnectionThrottle{
+public class DrillClient implements Closeable, ConnectionThrottle {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
 
   DrillConfig config;
@@ -223,7 +225,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     if (this.ownsAllocator && allocator != null) {
       allocator.close();
     }
-    if(ownsZkConnection) {
+    if (ownsZkConnection) {
       try {
         this.clusterCoordinator.close();
       } catch (IOException e) {
@@ -234,6 +236,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
       eventLoopGroup.shutdownGracefully();
     }
 
+    // TODO:  Did DRILL-1735 changes cover this TODO?:
     // TODO: fix tests that fail when this is called.
     //allocator.close();
     connected = false;
@@ -247,9 +250,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
    * @throws RpcException
    */
   public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
-    UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build() ;
+    UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
     ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
-    client.submitQuery(listener,query);
+    client.submitQuery(listener, query);
     return listener.getResults();
   }
 
@@ -282,7 +285,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   /**
    * Submits a Logical plan for direct execution (bypasses parsing)
    *
-   * @param plan the plan to execute
+   * @param  plan  the plan to execute
+   * @return a handle for the query result
+   * @throws RpcException
    */
   public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) {
     client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
@@ -294,6 +299,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     private UserProtos.RunQuery query ;
 
     public ListHoldingResultsListener(UserProtos.RunQuery query) {
+      logger.debug( "Listener created for query \"\"\"{}\"\"\"", query );
       this.query = query;
     }
 
@@ -323,11 +329,20 @@ public class DrillClient implements Closeable, ConnectionThrottle{
 
     @Override
     public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
-//      logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", result.getHeader().getIsLastChunk(), result);
+      logger.debug(
+          "Result arrived:  Query state: {}.  Is last chunk: {}.  Result: {}",
+          result.getHeader().getQueryState(),
+          result.getHeader().getIsLastChunk(),
+          result );
       results.add(result);
       if (result.getHeader().getIsLastChunk()) {
         future.set(results);
       }
+      else {
+        assert QueryState.PENDING == result.getHeader().getQueryState()
+            : "For non-last chunk, expected query state of PENDING but got "
+              + result.getHeader().getQueryState();
+      }
     }
 
     public List<QueryResultBatch> getResults() throws RpcException{
@@ -340,7 +355,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
 
     @Override
     public void queryIdArrived(QueryId queryId) {
+      logger.debug( "Query ID arrived: {}", queryId );
     }
+
   }
 
   private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2d1a136..404c453 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -78,6 +78,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 
     public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
       super(context, config);
+      // TODO  Edit:  That "as such" doesn't make sense.
       assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
       this.context = context;
       this.incoming = incoming;
@@ -86,15 +87,15 @@ public class ScreenCreator implements RootCreator<Screen>{
 
     @Override
     public boolean innerNext() {
-      if(!ok){
+      if (!ok) {
         stop();
         context.fail(this.listener.ex);
         return false;
       }
 
       IterOutcome outcome = next(incoming);
-//      logger.debug("Screen Outcome {}", outcome);
-      switch(outcome){
+      logger.trace("Screen Outcome {}", outcome);
+      switch (outcome) {
       case STOP: {
         this.internalStop();
         boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
@@ -177,7 +178,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 
     @Override
     public void stop() {
-      if(!oContext.isClosed()){
+      if (!oContext.isClosed()) {
         internalStop();
       }
       sendCount.waitForSendComplete();
@@ -213,7 +214,6 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
 
-
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index b079428..c05b127 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -20,9 +20,14 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.Nullable;
+
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
@@ -36,69 +41,203 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
 /**
- * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
- * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
- * query.
- *
- * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
- *
+ * Encapsulates the future management of query submissions.  This entails a
+ * potential race condition.  Normal ordering is:
+ * <ul>
+ *   <li>1.  Submit query to be executed. </li>
+ *   <li>2.  Receive QueryHandle for buffer management. </li>
+ *   <li>3.  Start receiving results batches for query. </li>
+ * </ul>
+ * However, 3 could potentially occur before 2.   Because of that, we need to
+ * handle this case and then do a switcheroo.
  */
 public class QueryResultHandler {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
-
-  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
-
-
-  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener) {
-    return new SubmissionListener(listener);
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+  /**
+   * Current listener for results, for each active query.
+   * <p>
+   *   Concurrency:  Access by SubmissionLister for query-ID message vs.
+   *   access by batchArrived is not otherwise synchronized.
+   * </p>
+   */
+  private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
+      Maps.newConcurrentMap();
+
+  /**
+   * Any is-last-chunk batch being deferred until the next batch
+   * (normally one with COMPLETED) arrives, per active query.
+   * <ul>
+   *   <li>Last-chunk batch is added (and not passed on) when it arrives.</li>
+   *   <li>Last-chunk batch is removed (and passed on) when next batch arrives
+   *       and has state {@link QueryState.COMPLETED}.</li>
+   *   <li>Last-chunk batch is removed (and not passed on) when next batch
+   *       arrives and has state {@link QueryState.CANCELED} or
+   *       {@link QueryState.FAILED}.</li>
+   * </ul>
+   */
+  private final Map<QueryId, QueryResultBatch> queryIdToDeferredLastChunkBatchesMap =
+      new ConcurrentHashMap<>();
+
+
+  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
+    return new SubmissionListener(resultsListener);
   }
 
-  public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
-    final QueryResultBatch batch = new QueryResultBatch(result, (DrillBuf) dBody);
-    final boolean failed = (batch.getHeader().getQueryState() == QueryState.FAILED);
-
-    assert failed || batch.getHeader().getErrorCount() == 0 : "Error count for the query batch is non-zero but QueryState != FAILED";
-
-    UserResultsListener l = resultsListener.get(result.getQueryId());
-    // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-    if (l == null) {
-      BufferingListener bl = new BufferingListener();
-      l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-      // if we had a successful insert, use that reference.  Otherwise, just throw away the new bufering listener.
-      if (l == null) {
-        l = bl;
+  /**
+   * Maps internal low-level API protocol to {@link UserResultsListener}-level
+   * API protocol, deferring sending is-last-chunk batches until (internal)
+   * COMPLETED batch.
+   */
+  public void batchArrived( ConnectionThrottle throttle,
+                            ByteBuf pBody, ByteBuf dBody ) throws RpcException {
+    final QueryResult queryResult = RpcBus.get( pBody, QueryResult.PARSER );
+    // Current batch coming in.  (Not necessarily passed along now or ever.)
+    final QueryResultBatch inputBatch = new QueryResultBatch( queryResult,
+                                                              (DrillBuf) dBody );
+
+    final QueryId queryId = queryResult.getQueryId();
+    final QueryState queryState = inputBatch.getHeader().getQueryState();
+
+    logger.debug( "batchArrived: isLastChunk: {}, queryState: {}, queryId = {}",
+                  inputBatch.getHeader().getIsLastChunk(), queryState, queryId );
+    logger.trace( "batchArrived: currentBatch = {}", inputBatch );
+
+    final boolean isFailureBatch    = QueryState.FAILED    == queryState;
+    final boolean isCompletionBatch = QueryState.COMPLETED == queryState;
+    final boolean isLastChunkBatchToDelay =
+        inputBatch.getHeader().getIsLastChunk() && QueryState.PENDING == queryState;
+    final boolean isTerminalBatch;
+    switch ( queryState ) {
+      case PENDING:
+         isTerminalBatch = false;
+         break;
+      case FAILED:
+      case CANCELED:
+      case COMPLETED:
+        isTerminalBatch = true;
+        break;
+      default:
+        logger.error( "Unexpected/unhandled QueryState " + queryState
+                      + " (for query " + queryId +  ")" );
+        isTerminalBatch = false;
+        break;
+    }
+    assert isFailureBatch || inputBatch.getHeader().getErrorCount() == 0
+        : "Error count for the query batch is non-zero but QueryState != FAILED";
+
+    UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
+    logger.trace( "For QueryId [{}], retrieved results listener {}", queryId,
+                  resultsListener );
+    if ( null == resultsListener ) {
+      // WHO?? didn't get query ID response and set submission listener yet,
+      // so install a buffering listener for now
+
+      BufferingResultsListener bl = new BufferingResultsListener();
+      resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
+      // If we had a successful insertion, use that reference.  Otherwise, just
+      // throw away the new buffering listener.
+      if ( null == resultsListener ) {
+        resultsListener = bl;
       }
-      if (result.getQueryId().toString().equals("")) {
+      // TODO:  Is there a more direct way to detect a Query ID in whatever
+      // state this string comparison detects?
+      if ( queryId.toString().equals( "" ) ) {
         failAll();
       }
     }
 
-    if(failed) {
-      String message = buildErrorMessage(batch);
-      l.submissionFailed(new RpcException(message));
-      resultsListener.remove(result.getQueryId(), l);
-    }else{
-      try {
-        l.resultArrived(batch, throttle);
-      } catch (Exception e) {
-        batch.release();
-        l.submissionFailed(new RpcException(e));
+    try {
+      if (isFailureBatch) {
+        // Failure case--pass on via submissionFailed(...).
+
+        try {
+          String message = buildErrorMessage(inputBatch);
+          resultsListener.submissionFailed(new RpcException(message));
+        }
+        finally {
+          inputBatch.release();
+        }
+        // Note: Listener and any delayed batch are removed in finally below.
+      } else {
+        // A successful (data, completion, or cancelation) case--pass on via
+        // resultArrived, delaying any last-chunk batches until following
+        // COMPLETED batch and omitting COMPLETED batch.
+
+        // If is last-chunk batch, save until next batch for query (normally a
+        // COMPLETED batch) comes in:
+        if ( isLastChunkBatchToDelay ) {
+          // We have a (non-failure) is-last-chunk batch--defer it until we get
+          // the query's COMPLETED batch.
+
+          QueryResultBatch expectNone;
+          assert null == ( expectNone =
+                           queryIdToDeferredLastChunkBatchesMap.get( queryId ) )
+              : "Already have pending last-batch QueryResultBatch " + expectNone
+                + " (at receiving last-batch QueryResultBatch " + inputBatch
+                + ") for query " + queryId;
+          queryIdToDeferredLastChunkBatchesMap.put( queryId, inputBatch );
+          // Can't release batch now; will release at terminal batch in
+          // finally below.
+        } else {
+          // We have a batch triggering sending out a batch (maybe same one,
+          // maybe deferred one.
+
+          // Batch to send out in response to current batch.
+          final QueryResultBatch outputBatch;
+          if ( isCompletionBatch ) {
+            // We have a COMPLETED batch--we should have a saved is-last-chunk
+            // batch, and we must pass that on now (that we've seen COMPLETED).
+
+            outputBatch = queryIdToDeferredLastChunkBatchesMap.get( queryId );
+            assert null != outputBatch
+                : "No pending last-batch QueryResultsBatch saved, at COMPLETED"
+                + " QueryResultsBatch " + inputBatch + " for query " + queryId;
+          } else {
+            // We have a non--last-chunk PENDING batch or a CANCELED
+            // batch--pass it on.
+            outputBatch = inputBatch;
+          }
+          // Note to release input batch if it's not the batch we're sending out.
+          final boolean releaseInputBatch = outputBatch != inputBatch;
+
+          try {
+            resultsListener.resultArrived( outputBatch, throttle );
+            // That releases outputBatch if successful.
+          } catch ( Exception e ) {
+            outputBatch.release();
+            resultsListener.submissionFailed(new RpcException(e));
+          }
+          finally {
+            if ( releaseInputBatch ) {
+              inputBatch.release();
+            }
+          }
+        }
       }
-    }
+    } finally {
+      if ( isTerminalBatch ) {
+        // Remove and release any deferred is-last-chunk batch:
+        QueryResultBatch anyUnsentLastChunkBatch =
+             queryIdToDeferredLastChunkBatchesMap.remove( queryId );
+        if ( null != anyUnsentLastChunkBatch ) {
+          anyUnsentLastChunkBatch.release();
+        }
 
-    if (
-        (failed || result.getIsLastChunk())
-        &&
-        (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
-        ) {
-      resultsListener.remove(result.getQueryId(), l);
+       // TODO:  What exactly are we checking for?  How should we really check
+        // for it?
+        if ( (! ( resultsListener instanceof BufferingResultsListener )
+             || ((BufferingResultsListener) resultsListener).output != null ) ) {
+          queryIdToResultsListenersMap.remove( queryId, resultsListener );
+        }
+      }
     }
   }
 
   protected String buildErrorMessage(QueryResultBatch batch) {
     StringBuilder sb = new StringBuilder();
-    for (UserBitShared.DrillPBError error:batch.getHeader().getErrorList()) {
+    for (UserBitShared.DrillPBError error : batch.getHeader().getErrorList()) {
       sb.append(error.getMessage());
       sb.append("\n");
     }
@@ -106,12 +245,12 @@ public class QueryResultHandler {
   }
 
   private void failAll() {
-    for (UserResultsListener l : resultsListener.values()) {
+    for (UserResultsListener l : queryIdToResultsListenersMap.values()) {
       l.submissionFailed(new RpcException("Received result without QueryId"));
     }
   }
 
-  private class BufferingListener implements UserResultsListener {
+  private static class BufferingResultsListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
     private volatile boolean finished = false;
@@ -174,39 +313,42 @@ public class QueryResultHandler {
   }
 
   private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
-    private UserResultsListener listener;
+    private UserResultsListener resultsListener;
 
-    public SubmissionListener(UserResultsListener listener) {
+    public SubmissionListener(UserResultsListener resultsListener) {
       super();
-      this.listener = listener;
+      this.resultsListener = resultsListener;
     }
 
     @Override
     public void failed(RpcException ex) {
-      listener.submissionFailed(ex);
+      resultsListener.submissionFailed(ex);
     }
 
     @Override
     public void success(QueryId queryId, ByteBuf buf) {
-      listener.queryIdArrived(queryId);
-      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
-      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
-      // we need to deal with the situation where we already received results by the time we got the query id back. In
-      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
-      // results during the transition
+      resultsListener.queryIdArrived(queryId);
+      logger.debug("Received QueryId {} successfully.  Adding results listener {}.",
+                   queryId, resultsListener);
+      UserResultsListener oldListener =
+          queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);
+
+      // We need to deal with the situation where we already received results by
+      // the time we got the query id back.  In that case, we'll need to
+      // transfer the buffering listener over, grabbing a lock against reception
+      // of additional results during the transition.
       if (oldListener != null) {
         logger.debug("Unable to place user results listener, buffering listener was already in place.");
-        if (oldListener instanceof BufferingListener) {
-          resultsListener.remove(oldListener);
-          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+        if (oldListener instanceof BufferingResultsListener) {
+          queryIdToResultsListenersMap.remove(oldListener);
+          boolean all = ((BufferingResultsListener) oldListener).transferTo(this.resultsListener);
           // simply remove the buffering listener if we already have the last response.
           if (all) {
-            resultsListener.remove(oldListener);
+            queryIdToResultsListenersMap.remove(oldListener);
           } else {
-            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+            boolean replaced = queryIdToResultsListenersMap.replace(queryId, oldListener, resultsListener);
             if (!replaced) {
-              throw new IllegalStateException();
+              throw new IllegalStateException(); // TODO: Say what the problem is!
             }
           }
         } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3da2ea9..d0a998e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.rpc.TransportCheck;
 
 import com.codahale.metrics.MetricRegistry;
 
+// TODO:  Doc.  What kind of context?  (For what aspects, RPC?  What kind of data?)
 public class BootStrapContext implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
 
@@ -68,7 +69,8 @@ public class BootStrapContext implements Closeable{
     return allocator;
   }
 
-  public void close(){
+  public void close() {
+    DrillMetrics.resetMetrics();
     loop.shutdownGracefully();
     allocator.close();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index b606707..0d8c892 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -237,7 +237,7 @@ public class Drillbit implements AutoCloseable {
     registrationHandle = coord.register(md);
     startJetty();
 
-    Runtime.getRuntime().addShutdownHook(new ShutdownThread());
+    Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
   }
 
   @Override
@@ -269,15 +269,20 @@ public class Drillbit implements AutoCloseable {
     logger.info("Shutdown completed.");
   }
 
-  private class ShutdownThread extends Thread {
-    ShutdownThread() {
-      setName("Drillbit-ShutdownHook");
+  private static class ShutdownThread extends Thread {
+    private static int idCounter = 0;
+    private final Drillbit drillbit;
+
+    ShutdownThread( Drillbit drillbit ) {
+      this.drillbit = drillbit;
+      idCounter++;
+      setName("Drillbit-ShutdownHook#" + idCounter );
     }
 
     @Override
     public void run() {
       logger.info("Received shutdown request.");
-      close();
+      drillbit.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
index b8480b4..b69ae54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
@@ -26,10 +26,18 @@ import java.sql.Timestamp;
 
 import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessException;
 
+// TODO:  Doc.
 public interface SqlAccessor {
 
+  // TODO:  Document (renamed) index.
+  // TODO:  Rename ambiguous "index" (JDBC (1-based) column index? other index?)
+  // TODO:  Doc./Spec.:  What happens if index is invalid?
+
   public abstract boolean isNull(int index);
 
+  // TODO:  Clean:  This interface refers to type InvalidAccessException
+  // defined in class implementing this interface.
+
   public abstract BigDecimal getBigDecimal(int index) throws InvalidAccessException;
 
   public abstract boolean getBoolean(int index) throws InvalidAccessException;
@@ -56,6 +64,9 @@ public interface SqlAccessor {
 
   public abstract Reader getReader(int index) throws InvalidAccessException;
 
+  // TODO: Doc./Spec.:  What should happen if called on non-string type?  (Most
+  // are convertible to string.  Does that result in error or conversion?)
+  // Similar question for many other methods.
   public abstract String getString(int index) throws InvalidAccessException;
 
   public abstract Time getTime(int index) throws InvalidAccessException;

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 9650ee5..bfb6de8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -803,7 +803,8 @@ public class Foreman implements Runnable {
 
     // record all fragments for status purposes.
     for (PlanFragment planFragment : fragments) {
-//      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
+      logger.trace("Tracking intermediate remote node {} with data {}",
+                   planFragment.getAssignment(), planFragment.getFragmentJson());
       queryManager.addFragmentStatusTracker(planFragment, false);
       if (planFragment.getLeafFragment()) {
         leafFragmentMap.put(planFragment.getAssignment(), planFragment);

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index b6176db..5592707 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -41,6 +41,8 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
 
+  // TODO:  REVIEW:  Can't this be AtomicReference<FragmentState> (so that
+  // debugging and logging don't show just integer values--and for type safety)?
   private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
   private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
@@ -48,13 +50,26 @@ public class FragmentExecutor implements Runnable {
   private volatile boolean closed;
   private RootExec root;
 
+
   public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
-      final StatusReporter listener) {
+                          final StatusReporter listener) {
     this.fragmentContext = context;
     this.rootOperator = rootOperator;
     this.listener = listener;
   }
 
+  @Override
+  public String toString() {
+    return
+        super.toString()
+        + "[closed = " + closed
+        + ", state = " + state
+        + ", rootOperator = " + rootOperator
+        + ", fragmentContext = " + fragmentContext
+        + ", listener = " + listener
+        + "]";
+  }
+
   public FragmentStatus getStatus() {
     /*
      * If the query is not in a running state, the operator tree is still being constructed and
@@ -73,10 +88,15 @@ public class FragmentExecutor implements Runnable {
   }
 
   public void cancel() {
+    logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
+
     // Note this will be called outside of run(), from another thread
+    // Change state checked by main loop to terminate it (if not already done):
     updateState(FragmentState.CANCELLED);
-    logger.debug("Cancelled Fragment {}", fragmentContext.getHandle());
+
     fragmentContext.cancel();
+
+    logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
@@ -106,18 +126,20 @@ public class FragmentExecutor implements Runnable {
       logger.debug("Starting fragment runner. {}:{}",
           fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
       if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
-        logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
+        logger.warn("Unable to set fragment state to RUNNING.  Cancelled or failed?");
         return;
       }
 
       /*
-       * Run the query until root.next returns false.
-       * Note that we closeOutResources() here if we're done. That's because this can also throw
-       * exceptions that we want to treat as failures of the request, even if the request did fine
-       * up until this point. Any failures there will be caught in the catch clause below, which
-       * will be reported to the user. If they were to come from the finally clause, the uncaught
-       * exception there will simply terminate this thread without alerting the user -- the
-       * behavior then is to hang.
+       * Run the query until root.next returns false OR cancel() changes the
+       * state.
+       * Note that we closeOutResources() here if we're done.  That's because
+       * this can also throw exceptions that we want to treat as failures of the
+       * request, even if the request did fine up until this point.  Any
+       * failures there will be caught in the catch clause below, which will be
+       * reported to the user.  If they were to come from the finally clause,
+       * the uncaught exception there will simply terminate this thread without
+       * alerting the user--the behavior then is to hang.
        */
       while (state.get() == FragmentState.RUNNING_VALUE) {
         if (!root.next()) {
@@ -191,7 +213,7 @@ public class FragmentExecutor implements Runnable {
   /**
    * Updates the fragment state with the given state
    *
-   * @param to target state
+   * @param  to  target state
    */
   private void updateState(final FragmentState to) {
     state.set(to.getNumber());
@@ -201,8 +223,8 @@ public class FragmentExecutor implements Runnable {
   /**
    * Updates the fragment state only if the current state matches the expected.
    *
-   * @param expected expected current state
-   * @param to target state
+   * @param  expected  expected current state
+   * @param  to  target state
    * @return true only if update succeeds
    */
   private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
@@ -229,8 +251,8 @@ public class FragmentExecutor implements Runnable {
    * Update the state if current state matches expected or fail the fragment if state transition fails even though
    * fragment is not in a terminal state.
    *
-   * @param expected current expected state
-   * @param to target state
+   * @param expected  current expected state
+   * @param to  target state
    * @return true only if update succeeds
    */
   private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
@@ -257,8 +279,9 @@ public class FragmentExecutor implements Runnable {
       // if the defunct Drillbit was running our Foreman, then cancel the query
       final DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
       if (unregisteredDrillbits.contains(foremanEndpoint)) {
-        logger.warn("Foreman : {} no longer active. Cancelling fragment {}.",
-            foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
+        logger.warn("Foreman {} no longer active.  Cancelling fragment {}.",
+                    foremanEndpoint.getAddress(),
+                    QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
         FragmentExecutor.this.cancel();
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index 0272b23..8a1aecb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -25,6 +25,8 @@ public class ExecTest extends DrillTest {
 
   @After
   public void clear(){
+    // TODO:  (Re DRILL-1735) Check whether still needed now that
+    // BootstrapContext.close() resets the metrics.
     DrillMetrics.resetMetrics();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/AvaticaDrillSqlAccessor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/AvaticaDrillSqlAccessor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/AvaticaDrillSqlAccessor.java
index cf5829a..3702257 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/AvaticaDrillSqlAccessor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/AvaticaDrillSqlAccessor.java
@@ -43,67 +43,77 @@ import org.apache.drill.exec.vector.accessor.SqlAccessor;
 public class AvaticaDrillSqlAccessor implements Accessor{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvaticaDrillSqlAccessor.class);
 
-  private SqlAccessor a;
+  private SqlAccessor underlyingAccessor;
   private DrillCursor cursor;
 
   public AvaticaDrillSqlAccessor(SqlAccessor drillSqlAccessor, DrillCursor cursor) {
     super();
-    this.a = drillSqlAccessor;
+    this.underlyingAccessor = drillSqlAccessor;
     this.cursor = cursor;
   }
 
-  private int row(){
-    return cursor.currentRecord;
+  private int getCurrentRecordNumber() throws SQLException {
+    if ( cursor.getResultSet().isBeforeFirst() ) {
+      throw new InvalidCursorStateSqlException(
+          "Result set cursor is positioned before all rows.  Call next() first." );
+    }
+    else if ( cursor.getResultSet().isAfterLast() ) {
+      throw new InvalidCursorStateSqlException(
+          "Result set cursor is already positioned past all rows." );
+    }
+    else {
+      return cursor.getCurrentRecordNumber();
+    }
   }
 
   @Override
-  public boolean wasNull() {
-    return a.isNull(row());
+  public boolean wasNull() throws SQLException {
+    return underlyingAccessor.isNull(getCurrentRecordNumber());
   }
 
   @Override
   public String getString() throws SQLException {
-    return a.getString(row());
+    return underlyingAccessor.getString(getCurrentRecordNumber());
   }
 
   @Override
   public boolean getBoolean() throws SQLException {
-    return a.getBoolean(row());
+    return underlyingAccessor.getBoolean(getCurrentRecordNumber());
   }
 
   @Override
   public byte getByte() throws SQLException {
-    return a.getByte(row());
+    return underlyingAccessor.getByte(getCurrentRecordNumber());
   }
 
   @Override
   public short getShort() throws SQLException {
-    return a.getShort(row());
+    return underlyingAccessor.getShort(getCurrentRecordNumber());
   }
 
   @Override
   public int getInt() throws SQLException {
-    return a.getInt(row());
+    return underlyingAccessor.getInt(getCurrentRecordNumber());
   }
 
   @Override
   public long getLong() throws SQLException {
-    return a.getLong(row());
+    return underlyingAccessor.getLong(getCurrentRecordNumber());
   }
 
   @Override
   public float getFloat() throws SQLException {
-    return a.getFloat(row());
+    return underlyingAccessor.getFloat(getCurrentRecordNumber());
   }
 
   @Override
   public double getDouble() throws SQLException {
-    return a.getDouble(row());
+    return underlyingAccessor.getDouble(getCurrentRecordNumber());
   }
 
   @Override
   public BigDecimal getBigDecimal() throws SQLException {
-    return a.getBigDecimal(row());
+    return underlyingAccessor.getBigDecimal(getCurrentRecordNumber());
   }
 
   @Override
@@ -113,32 +123,32 @@ public class AvaticaDrillSqlAccessor implements Accessor{
 
   @Override
   public byte[] getBytes() throws SQLException {
-    return a.getBytes(row());
+    return underlyingAccessor.getBytes(getCurrentRecordNumber());
   }
 
   @Override
   public InputStream getAsciiStream() throws SQLException {
-    return a.getStream(row());
+    return underlyingAccessor.getStream(getCurrentRecordNumber());
   }
 
   @Override
   public InputStream getUnicodeStream() throws SQLException {
-    return a.getStream(row());
+    return underlyingAccessor.getStream(getCurrentRecordNumber());
   }
 
   @Override
   public InputStream getBinaryStream() throws SQLException {
-    return a.getStream(row());
+    return underlyingAccessor.getStream(getCurrentRecordNumber());
   }
 
   @Override
   public Object getObject() throws SQLException {
-    return a.getObject(row());
+    return underlyingAccessor.getObject(getCurrentRecordNumber());
   }
 
   @Override
   public Reader getCharacterStream() throws SQLException {
-    return a.getReader(row());
+    return underlyingAccessor.getReader(getCurrentRecordNumber());
   }
 
   @Override
@@ -168,17 +178,17 @@ public class AvaticaDrillSqlAccessor implements Accessor{
 
   @Override
   public Date getDate(Calendar calendar) throws SQLException {
-    return a.getDate(row());
+    return underlyingAccessor.getDate(getCurrentRecordNumber());
   }
 
   @Override
   public Time getTime(Calendar calendar) throws SQLException {
-    return a.getTime(row());
+    return underlyingAccessor.getTime(getCurrentRecordNumber());
   }
 
   @Override
   public Timestamp getTimestamp(Calendar calendar) throws SQLException {
-    return a.getTimestamp(row());
+    return underlyingAccessor.getTimestamp(getCurrentRecordNumber());
   }
 
   @Override
@@ -198,12 +208,12 @@ public class AvaticaDrillSqlAccessor implements Accessor{
 
   @Override
   public String getNString() throws SQLException {
-    return a.getString(row());
+    return underlyingAccessor.getString(getCurrentRecordNumber());
   }
 
   @Override
   public Reader getNCharacterStream() throws SQLException {
-    return a.getReader(row());
+    return underlyingAccessor.getReader(getCurrentRecordNumber());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillAccessorList.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillAccessorList.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillAccessorList.java
index 82d51f1..ccf2658 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillAccessorList.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillAccessorList.java
@@ -31,6 +31,8 @@ public class DrillAccessorList extends BasicList<Accessor>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillAccessorList.class);
 
   private Accessor[] accessors = new Accessor[0];
+  // TODO  Rename to lastColumnAccessed and/or document.
+  // TODO  Why 1, rather than, say, -1?
   private int lastColumn = 1;
 
   public void generateAccessors(DrillCursor cursor, RecordBatchLoader currentBatch){

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
index 54e31b1..de08cda 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionConfig.java
@@ -31,9 +31,11 @@ public class DrillConnectionConfig extends ConnectionConfigImpl {
   }
 
   public boolean isLocal(){
+    // TODO  Why doesn't this call getZookeeperConnectionString()?
     return "local".equals(props.getProperty("zk"));
   }
 
+  // TODO: Check: Shouldn't something validate that URL has "zk" parameter?
   public String getZookeeperConnectionString(){
     return props.getProperty("zk");
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
index f19aab0..e590778 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java
@@ -44,17 +44,18 @@ import org.apache.drill.exec.server.RemoteServiceSet;
  * Abstract to allow newer versions of JDBC to add methods.
  * </p>
  */
-abstract class DrillConnectionImpl extends AvaticaConnection implements org.apache.drill.jdbc.DrillConnection {
-  public final DrillStatementRegistry registry = new DrillStatementRegistry();
-  final DrillConnectionConfig config;
-
+abstract class DrillConnectionImpl extends AvaticaConnection implements DrillConnection {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConnection.class);
 
+  final DrillStatementRegistry openStatementsRegistry = new DrillStatementRegistry();
+  final DrillConnectionConfig config;
+
   private final DrillClient client;
   private final BufferAllocator allocator;
   private Drillbit bit;
   private RemoteServiceSet serviceSet;
 
+
   protected DrillConnectionImpl(Driver driver, AvaticaFactory factory, String url, Properties info) throws SQLException {
     super(driver, factory, url, info);
     this.config = new DrillConnectionConfig(info);
@@ -71,7 +72,7 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac
         this.allocator = new TopLevelAllocator(dConfig);
         RemoteServiceSet set = GlobalServiceSetReference.SETS.get();
         if (set == null) {
-          // we're embedded, start a local drill bit.
+          // We're embedded; start a local drill bit.
           serviceSet = RemoteServiceSet.getLocalServiceSet();
           set = serviceSet;
           try {
@@ -89,6 +90,10 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac
       } else {
         final DrillConfig dConfig = DrillConfig.forClient();
         this.allocator = new TopLevelAllocator(dConfig);
+        // TODO:  Check:  Why does new DrillClient() create another DrillConfig,
+        // with enableServerConfigs true, and cause scanning for function
+        // implementations (needed by a server, but not by a client-only
+        // process, right?)?  Probably pass dConfig to construction.
         this.client = new DrillClient();
         this.client.connect(config.getZookeeperConnectionString(), info);
       }
@@ -121,21 +126,24 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac
   }
 
   @Override
-  public DrillStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
-      throws SQLException {
-    DrillStatement statement = (DrillStatement) super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
-    registry.addStatement(statement);
+  public DrillStatement createStatement(int resultSetType, int resultSetConcurrency,
+                                        int resultSetHoldability) throws SQLException {
+    DrillStatement statement =
+        (DrillStatement) super.createStatement(resultSetType, resultSetConcurrency,
+                                               resultSetHoldability);
     return statement;
   }
 
   @Override
-  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
-      int resultSetHoldability) throws SQLException {
+  public PreparedStatement prepareStatement(String sql, int resultSetType,
+                                            int resultSetConcurrency,
+                                            int resultSetHoldability) throws SQLException {
     try {
       DrillPrepareResult prepareResult = new DrillPrepareResult(sql);
-      DrillPreparedStatement statement = (DrillPreparedStatement) factory.newPreparedStatement(this, prepareResult,
-          resultSetType, resultSetConcurrency, resultSetHoldability);
-      registry.addStatement(statement);
+      DrillPreparedStatement statement =
+          (DrillPreparedStatement) factory.newPreparedStatement(
+              this, prepareResult, resultSetType, resultSetConcurrency,
+              resultSetHoldability);
       return statement;
     } catch (RuntimeException e) {
       throw Helper.INSTANCE.createException("Error while preparing statement [" + sql + "]", e);
@@ -160,6 +168,10 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac
   }
 
   void cleanup() {
+    // First close any open JDBC Statement objects, to close any open ResultSet
+    // objects and release their buffers/vectors.
+    openStatementsRegistry.close();
+
     client.close();
     allocator.close();
     if (bit != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index fbe611f..cddd999 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -31,32 +31,50 @@ import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 
-public class DrillCursor implements Cursor{
+public class DrillCursor implements Cursor {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class);
 
   private static final String UNKNOWN = "--UNKNOWN--";
 
+  /** The associated java.sql.ResultSet implementation. */
+  private final DrillResultSet resultSet;
+
+  private final RecordBatchLoader currentBatch;
+  private final DrillResultSet.ResultsListener resultsListener;
+
+  // TODO:  Doc.:  Say what's started (set of rows?  just current result batch?)
   private boolean started = false;
   private boolean finished = false;
-  private final RecordBatchLoader currentBatch;
-  private final DrillResultSet.Listener listener;
+  // TODO:  Doc.: Say what "readFirstNext" means.
   private boolean redoFirstNext = false;
+  // TODO:  Doc.: First what? (First batch? record? "next" call/operation?)
   private boolean first = true;
 
   private DrillColumnMetaDataList columnMetaDataList;
   private BatchSchema schema;
 
-  final DrillResultSet results;
-  int currentRecord = 0;
+  /** Zero-based index of current record in record batch. */
+  private int currentRecordNumber = -1;
   private long recordBatchCount;
   private final DrillAccessorList accessors = new DrillAccessorList();
 
 
-  public DrillCursor(DrillResultSet results) {
-    super();
-    this.results = results;
-    currentBatch = results.currentBatch;
-    this.listener = results.listener;
+  /**
+   *
+   * @param  resultSet  the associated ResultSet implementation
+   */
+  public DrillCursor(final DrillResultSet resultSet) {
+    this.resultSet = resultSet;
+    currentBatch = resultSet.currentBatch;
+    resultsListener = resultSet.resultslistener;
+  }
+
+  public DrillResultSet getResultSet() {
+    return resultSet;
+  }
+
+  protected int getCurrentRecordNumber() {
+    return currentRecordNumber;
   }
 
   @Override
@@ -65,12 +83,20 @@ public class DrillCursor implements Cursor{
     return accessors;
   }
 
+  // TODO:  Doc.:  Specify what the return value actually means.  (The wording
+  // "Moves to the next row" and "Whether moved" from the documentation of the
+  // implemented interface (net.hydromatic.avatica.Cursor) doesn't address
+  // moving past last row or how to evaluate "whether moved" on the first call.
+  // In particular, document what the return value indicates about whether we're
+  // currently at a valid row (or whether next() can be called again, or
+  // whatever it does indicate), especially the first time this next() called
+  // for a new result.
   @Override
   public boolean next() throws SQLException {
     if (!started) {
       started = true;
       redoFirstNext = true;
-    } else if(redoFirstNext && !finished) {
+    } else if (redoFirstNext && !finished) {
       redoFirstNext = false;
       return true;
     }
@@ -79,27 +105,32 @@ public class DrillCursor implements Cursor{
       return false;
     }
 
-    if (currentRecord+1 < currentBatch.getRecordCount()) {
-      currentRecord++;
+    if (currentRecordNumber + 1 < currentBatch.getRecordCount()) {
+      // Next index is in within current batch--just increment to that record.
+      currentRecordNumber++;
       return true;
     } else {
+      // Next index is not in current batch (including initial empty batch--
+      // (try to) get next batch.
       try {
-        QueryResultBatch qrb = listener.getNext();
+        QueryResultBatch qrb = resultsListener.getNext();
         recordBatchCount++;
         while (qrb != null && qrb.getHeader().getRowCount() == 0 && !first) {
           qrb.release();
-          qrb = listener.getNext();
+          qrb = resultsListener.getNext();
           recordBatchCount++;
         }
 
         first = false;
 
         if (qrb == null) {
+          currentBatch.clear();
           finished = true;
           return false;
         } else {
-          currentRecord = 0;
+          currentRecordNumber = 0;
           boolean changed = currentBatch.load(qrb.getHeader().getDef(), qrb.getData());
+          qrb.release();
           schema = currentBatch.getSchema();
           if (changed) {
             updateColumns();
@@ -119,8 +150,8 @@ public class DrillCursor implements Cursor{
   void updateColumns() {
     accessors.generateAccessors(this, currentBatch);
     columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, schema);
-    if (results.changeListener != null) {
-      results.changeListener.schemaChanged(schema);
+    if (getResultSet().changeListener != null) {
+      getResultSet().changeListener.schemaChanged(schema);
     }
   }
 
@@ -130,7 +161,17 @@ public class DrillCursor implements Cursor{
 
   @Override
   public void close() {
-    results.cleanup();
+    // currentBatch is owned by resultSet and cleaned up by
+    // DrillResultSet.cleanup()
+
+    // listener is owned by resultSet and cleaned up by
+    // DrillResultSet.cleanup()
+
+    // Clean up result set (to deallocate any buffers).
+    getResultSet().cleanup();
+    // TODO:  CHECK:  Something might need to set statement.openResultSet to
+    // null.  Also, AvaticaResultSet.close() doesn't check whether already
+    // closed and skip calls to cursor.close(), statement.onResultSetClose()
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillPreparedStatement.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillPreparedStatement.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillPreparedStatement.java
index cfcee8c..4397c2f 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillPreparedStatement.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillPreparedStatement.java
@@ -30,11 +30,13 @@ import net.hydromatic.avatica.AvaticaPreparedStatement;
  * {@link net.hydromatic.avatica.AvaticaFactory#newPreparedStatement}.
  * </p>
  */
-abstract class DrillPreparedStatement extends AvaticaPreparedStatement implements DrillRemoteStatement {
+abstract class DrillPreparedStatement extends AvaticaPreparedStatement
+    implements DrillRemoteStatement {
 
   protected DrillPreparedStatement(DrillConnectionImpl connection, AvaticaPrepareResult prepareResult,
       int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
     super(connection, prepareResult, resultSetType, resultSetConcurrency, resultSetHoldability);
+    connection.openStatementsRegistry.addStatement(this);
   }
 
   @Override
@@ -45,6 +47,6 @@ abstract class DrillPreparedStatement extends AvaticaPreparedStatement implement
   @Override
   public void cleanup() {
     final DrillConnectionImpl connection1 = (DrillConnectionImpl) connection;
-    connection1.registry.removeStatement(this);
+    connection1.openStatementsRegistry.removeStatement(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
index 77b2c37..0ce33f4 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
@@ -19,6 +19,8 @@ package org.apache.drill.jdbc;
 
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TimeZone;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import net.hydromatic.avatica.AvaticaPrepareResult;
 import net.hydromatic.avatica.AvaticaResultSet;
 import net.hydromatic.avatica.AvaticaStatement;
+import net.hydromatic.avatica.Cursor;
+import net.hydromatic.avatica.Cursor.Accessor;
 
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -46,7 +50,7 @@ public class DrillResultSet extends AvaticaResultSet {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class);
 
   SchemaChangeListener changeListener;
-  final Listener listener = new Listener();
+  final ResultsListener resultslistener = new ResultsListener();
   private volatile QueryId queryId;
   private final DrillClient client;
   final RecordBatchLoader currentBatch;
@@ -70,10 +74,11 @@ public class DrillResultSet extends AvaticaResultSet {
   }
 
   synchronized void cleanup() {
-    if (queryId != null && !listener.completed) {
+    if (queryId != null && ! resultslistener.completed) {
       client.cancelQuery(queryId);
     }
-    listener.close();
+    resultslistener.close();
+    currentBatch.clear();
   }
 
   @Override
@@ -81,29 +86,32 @@ public class DrillResultSet extends AvaticaResultSet {
     // Next may be called after close has been called (for example after a user cancel) which in turn
     // sets the cursor to null. So we must check before we call next.
     // TODO: handle next() after close is called in the Avatica code.
-    if(super.cursor!=null){
+    if (super.cursor != null) {
       return super.next();
-    }else{
+    } else {
       return false;
     }
-
   }
 
-
-  @Override protected DrillResultSet execute() throws SQLException{
+  @Override
+  protected DrillResultSet execute() throws SQLException{
     // Call driver's callback. It is permitted to throw a RuntimeException.
     DrillConnectionImpl connection = (DrillConnectionImpl) statement.getConnection();
 
-    connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(), listener);
+    connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(),
+                                    resultslistener);
     connection.getDriver().handler.onStatementExecute(statement, null);
 
     super.execute();
 
     // don't return with metadata until we've achieved at least one return message.
     try {
-      listener.latch.await();
-      cursor.next();
+      resultslistener.latch.await();
+      boolean notAtEnd = cursor.next();
+      assert notAtEnd;
     } catch (InterruptedException e) {
+     // TODO:  Check:  Should this call Thread.currentThread.interrupt()?   If
+     // not, at least document why this is empty.
     }
 
     return this;
@@ -117,7 +125,7 @@ public class DrillResultSet extends AvaticaResultSet {
     }
   }
 
-  class Listener implements UserResultsListener {
+  class ResultsListener implements UserResultsListener {
     private static final int MAX = 100;
     private volatile RpcException ex;
     volatile boolean completed = false;
@@ -131,6 +139,7 @@ public class DrillResultSet extends AvaticaResultSet {
 
     final LinkedBlockingDeque<QueryResultBatch> queue = Queues.newLinkedBlockingDeque();
 
+    // TODO:  Doc.:  Release what if what is first relative to what?
     private boolean releaseIfFirst() {
       if (receivedMessage.compareAndSet(false, true)) {
         latch.countDown();
@@ -158,14 +167,14 @@ public class DrillResultSet extends AvaticaResultSet {
         return;
       }
 
-      // if we're in a closed state, just release the message.
+      // If we're in a closed state, just release the message.
       if (closed) {
         result.release();
         completed = true;
         return;
       }
 
-      // we're active, let's add to the queue.
+      // We're active; let's add to the queue.
       queue.add(result);
       if (queue.size() >= MAX - 1) {
         throttle.setAutoRead(false);
@@ -185,6 +194,7 @@ public class DrillResultSet extends AvaticaResultSet {
 
     }
 
+    // TODO:  Doc.:  Specify whether result can be null and what that means.
     public QueryResultBatch getNext() throws RpcException, InterruptedException {
       while (true) {
         if (ex != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
index fec126e..d934c7c 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
@@ -19,10 +19,12 @@ package org.apache.drill.jdbc;
 
 import net.hydromatic.avatica.AvaticaStatement;
 
-public abstract class DrillStatement extends AvaticaStatement implements DrillRemoteStatement {
+public abstract class DrillStatement extends AvaticaStatement
+   implements DrillRemoteStatement {
 
   DrillStatement(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
     super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
+    connection.openStatementsRegistry.addStatement(this);
   }
 
   @Override
@@ -33,7 +35,7 @@ public abstract class DrillStatement extends AvaticaStatement implements DrillRe
   @Override
   public void cleanup() {
     final DrillConnectionImpl connection1 = (DrillConnectionImpl) connection;
-    connection1.registry.removeStatement(this);
+    connection1.openStatementsRegistry.removeStatement(this);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatementRegistry.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatementRegistry.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatementRegistry.java
index cc797fa..adbbb64 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatementRegistry.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatementRegistry.java
@@ -17,12 +17,61 @@
  */
 package org.apache.drill.jdbc;
 
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Registry of open statements (for a connection), for closing them when a
+ * connection is closed.
+ * <p>
+ *   Concurrency:  Not thread-safe.  (Creating statements, closing statements,
+ *   and closing connection cannot be concurrent (unless concurrency is
+ *   coordinated elsewhere).)
+ * </p>
+ */
 class DrillStatementRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillStatementRegistry.class);
 
+  private static final Logger logger = getLogger( DrillStatementRegistry.class );
+
+  /** ... (using as IdentityHash*Set*) */
+  private final Map<Statement, Object> openStatements = new IdentityHashMap<>();
+
+
+  public void addStatement( Statement statement ) {
+    logger.debug( "Adding to open-statements registry: " + statement );
+    openStatements.put( statement, statement );
+  }
+
+  public void removeStatement( Statement statement ) {
+    logger.debug( "Removing from open-statements registry: " + statement );
+    openStatements.remove( statement );
+  }
+
+  public void close() {
+    // Note:  Can't call close() on statement during iteration of map because
+    // close() calls our removeStatement(...), which modifies the map.
 
-  public void addStatement(DrillRemoteStatement statement){}
-  public void removeStatement(DrillRemoteStatement statement){}
+    // Copy set of open statements to other collection before closing:
+    final List<Statement> copiedList = new ArrayList<>( openStatements.keySet() );
 
-  public void close(){}
+    for ( final Statement statement : copiedList ) {
+      try {
+        logger.debug( "Auto-closing (via open-statements registry): " + statement );
+        statement.close();
+      }
+      catch ( SQLException e ) {
+        logger.error( "Error auto-closing statement " + statement + ": " + e, e );
+        // Otherwise ignore the error, to close which statements we can close.
+      }
+    }
+    openStatements.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/Driver.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/Driver.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/Driver.java
index 974e786..55453e8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/Driver.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/Driver.java
@@ -71,9 +71,11 @@ public class Driver extends UnregisteredDriver {
 
   @Override
   protected Handler createHandler() {
-    return new HandlerImpl();
+    return new DrillHandler();
   }
 
+  // Any reference to class loads class, and loading class instantiates an
+  // instance and has it register itself:
   static {
     new Driver().register();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/InvalidCursorStateSqlException.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/InvalidCursorStateSqlException.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/InvalidCursorStateSqlException.java
new file mode 100644
index 0000000..7b04371
--- /dev/null
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/InvalidCursorStateSqlException.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.jdbc;
+
+import java.sql.ResultSet;
+
+/**
+ * SQLException for invalid-cursor-state conditions, e.g., calling a column
+ * accessor method before calling {@link ResultSet#next()} or after
+ * {@link ResultSet#next()} returns false.
+ *
+ */
+class InvalidCursorStateSqlException extends JdbcApiSqlException {
+
+  private static final long serialVersionUID = 2014_12_09L;
+
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException(String, String, int)}.
+   */
+  public InvalidCursorStateSqlException( String reason,
+                                         String SQLState,
+                                         int vendorCode ) {
+    super( reason, SQLState, vendorCode );
+  }
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException(String, String)}.
+   */
+  public InvalidCursorStateSqlException( String reason, String SQLState ) {
+    super( reason, SQLState );
+  }
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException(String)}.
+   */
+  public InvalidCursorStateSqlException( String reason ) {
+    super( reason );
+  }
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException()}.
+   * */
+  public InvalidCursorStateSqlException() {
+    super();
+  }
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException(Throwable cause)}.
+   */
+  public InvalidCursorStateSqlException( Throwable cause ) {
+    super( cause );
+  }
+
+  /**
+   * See {@link JdbcApiSqlException#JdbcApiSqlException(String, Throwable)}.
+   */
+  public InvalidCursorStateSqlException( String reason, Throwable cause ) {
+    super( reason, cause );
+  }
+
+  /**
+   * See
+   * {@link JdbcApiSqlException#JdbcApiSqlException(String, String, Throwable)}.
+   */
+  public InvalidCursorStateSqlException( String reason, String sqlState,
+                                         Throwable cause ) {
+    super( reason, sqlState, cause );
+  }
+
+  /**
+   * See
+   * {@link JdbcApiSqlException#JdbcApiSqlException(String, String, int, Throwable)}.
+   */
+  public InvalidCursorStateSqlException( String reason,
+                                         String sqlState,
+                                         int vendorCode,
+                                         Throwable cause ) {
+    super( reason, sqlState, vendorCode, cause );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/JdbcApiSqlException.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/JdbcApiSqlException.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/JdbcApiSqlException.java
new file mode 100644
index 0000000..d6b05fb
--- /dev/null
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/JdbcApiSqlException.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.jdbc;
+
+import java.sql.SQLException;
+
+/**
+ * SQLException for JDBC API calling sequence/state problems.
+ *
+ * <p>
+ *   {@code JdbcApiSqlException} is intended for errors in using the JDBC API,
+ *   such as calling {@link ResultSet#getString} before calling
+ *   {@link ResultSet#next}.
+ * </p>
+ * <p>
+ *   ({@code JdbcApiSqlException} is not for errors that are not under direct
+ *   control of the programmer writing JDBC API calls, for example, invalid SQL
+ *   syntax, errors from SQL-vs.-data mismatches, data file format errors,
+ *   resource availability errors, or internal Drill errors.)
+ * </p>
+ * <p>
+ *  TODO:  Consider having a DrillSqlException (in part for reviewing,
+ *  coordinating, and revising the many uses of SQLException in the code).
+ * </p>
+ * <p>
+ *  TODO:  Consider using ANSI-/XOPEN-standard SQL State values.  (See:
+ * </p>
+ * <ul>
+ *   <li>
+ *     <a href="
+ *       http://stackoverflow.com/questions/1399574/what-are-all-the-possible-values-for-sqlexception-getsqlstate
+ *       ">
+ *      http://stackoverflow.com/questions/1399574/what-are-all-the-possible-values-for-sqlexception-getsqlstate
+ *     </a>
+ *   </li>
+ *   <li>
+ *     <a href="
+ *       https://github.com/olamedia/kanon/blob/master/src/mvc-model/storageDrivers/SQLSTATE.txt
+ *     ">
+ *       https://github.com/olamedia/kanon/blob/master/src/mvc-model/storageDrivers/SQLSTATE.txt
+ *     </a>
+ *   </li>
+ *   <li>
+ *     <a href="
+ *       http://kanon-framework.googlecode.com/svn/trunk/src/mvc-model/storageDrivers/SQLSTATE.txt
+ *     ">
+ *       http://kanon-framework.googlecode.com/svn/trunk/src/mvc-model/storageDrivers/SQLSTATE.txt
+ *     </a>
+ *   </li>
+ *   <li>
+ *     <a href="
+ *       http://www-01.ibm.com/support/knowledgecenter/api/content/nl/en-us/SSVHEW_6.2.0/com.ibm.rcp.tools.doc.db2e/adg/sql11.html
+ *     ">
+ *       http://www-01.ibm.com/support/knowledgecenter/api/content/nl/en-us/SSVHEW_6.2.0/com.ibm.rcp.tools.doc.db2e/adg/sql11.html
+ *     </a>
+ *   </li>
+ *   <li>
+ *     <a href="
+ *       ftp://ftp.software.ibm.com/ps/products/db2/info/vr6/htm/db2m0/db2state.htm
+ *     ">
+ *       ftp://ftp.software.ibm.com/ps/products/db2/info/vr6/htm/db2m0/db2state.htm
+ *     </a>
+ *   </li>
+ *   <li>
+ *     <a href="
+ *       https://docs.oracle.com/cd/E15817_01/appdev.111/b31230/ch2.htm
+ *     ">
+ *       https://docs.oracle.com/cd/E15817_01/appdev.111/b31230/ch2.htm
+ *     </a>
+ *   </li>
+ * </ul>
+ * <p>
+ *   etc.)
+ * </p>
+ */
+class JdbcApiSqlException extends SQLException {
+
+  private static final long serialVersionUID = 2014_12_12L;
+
+
+  /**
+   * See {@link SQLException#SQLException(String, String, int)}.
+   */
+  public JdbcApiSqlException( String reason, String SQLState, int vendorCode ) {
+    super( reason, SQLState, vendorCode );
+  }
+
+  /**
+   * See {@link SQLException#SQLException(String, String)}.
+   */
+  public JdbcApiSqlException( String reason, String SQLState ) {
+    super( reason, SQLState );
+  }
+
+  /**
+   * See {@link SQLException#SQLException(String)}.
+   */
+  public JdbcApiSqlException( String reason ) {
+    super( reason );
+  }
+
+  /**
+   * See {@link SQLException#SQLException()}.
+   * */
+  public JdbcApiSqlException() {
+    super();
+  }
+
+  /**
+   * See {@link SQLException#SQLException(Throwable cause)}.
+   */
+  public JdbcApiSqlException( Throwable cause ) {
+    super( cause );
+  }
+
+  /**
+   * See {@link SQLException#SQLException(String, Throwable)}.
+   */
+  public JdbcApiSqlException( String reason, Throwable cause ) {
+    super( reason, cause );
+  }
+
+  /**
+   * See {@link SQLException#SQLException(String, String, Throwable)}.
+   */
+  public JdbcApiSqlException( String reason, String sqlState, Throwable cause ) {
+    super( reason, sqlState, cause );
+  }
+
+  /**
+   * See {@link SQLException#SQLException(String, String, int, Throwable)}.
+   */
+  public JdbcApiSqlException( String reason,
+                              String sqlState,
+                              int vendorCode,
+                              Throwable cause ) {
+    super( reason, sqlState, vendorCode, cause );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9c9ee8c4/exec/jdbc/src/test/java/org/apache/drill/jdbc/DrillResultSetTest.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/DrillResultSetTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/DrillResultSetTest.java
new file mode 100644
index 0000000..de19615
--- /dev/null
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/DrillResultSetTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.jdbc;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import net.hydromatic.linq4j.Ord;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.util.Hook;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.core.StringContains.containsString;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableSet.Builder;
+
+public class DrillResultSetTest extends DrillTest {
+
+  // TODO: Move Jetty status server disabling to DrillTest.
+  private static final String STATUS_SERVER_PROPERTY_NAME =
+      ExecConstants.HTTP_ENABLE;
+
+  private static final String origStatusServerPropValue =
+      System.getProperty( STATUS_SERVER_PROPERTY_NAME, "true" );
+
+  // Disable Jetty status server so unit tests run (outside Maven setup).
+  // (TODO:  Move this to base test class and/or have Jetty try other ports.)
+  @BeforeClass
+  public static void setUpClass() {
+    System.setProperty( STATUS_SERVER_PROPERTY_NAME, "false" );
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    System.setProperty( STATUS_SERVER_PROPERTY_NAME, origStatusServerPropValue );
+  }
+
+
+  @Test
+  public void test_next_blocksFurtherAccessAfterEnd()
+      throws SQLException
+  {
+    Connection connection = new Driver().connect( "jdbc:drill:zk=local", null );
+    Statement statement = connection.createStatement();
+    ResultSet resultSet =
+        statement.executeQuery( "SELECT 1 AS x \n" +
+                                "FROM cp.`donuts.json` \n" +
+                                "LIMIT 2" );
+
+    // Advance to first row; confirm can access data.
+    assertThat( resultSet.next(), is( true ) );
+    assertThat( resultSet.getInt( 1 ), is ( 1 ) );
+
+    // Advance from first to second (last) row, confirming data access.
+    assertThat( resultSet.next(), is( true ) );
+    assertThat( resultSet.getInt( 1 ), is ( 1 ) );
+
+    // Now advance past last row.
+    assertThat( resultSet.next(), is( false ) );
+
+    // Main check:  That row data access methods now throw SQLException.
+    try {
+      resultSet.getInt( 1 );
+      fail( "Did get expected SQLException." );
+    }
+    catch ( SQLException e ) {
+      // Expect something like current InvalidCursorStateSqlException saying
+      // "Result set cursor is already positioned past all rows."
+      assertThat( e, instanceOf( InvalidCursorStateSqlException.class ) );
+      assertThat( e.toString(), containsString( "past" ) );
+    }
+    // (Any other exception is unexpected result.)
+
+    assertThat( resultSet.next(), is( false ) );
+
+    // TODO:  Ideally, test all other accessor methods.
+  }
+
+  @Test
+  public void test_next_blocksFurtherAccessWhenNoRows()
+    throws Exception
+  {
+    Connection connection = new Driver().connect( "jdbc:drill:zk=local", null );
+    Statement statement = connection.createStatement();
+    ResultSet resultSet =
+        statement.executeQuery( "SELECT 'Hi' AS x \n" +
+                                "FROM cp.`donuts.json` \n" +
+                                "WHERE false" );
+
+    // Do initial next(). (Advance from before results to next possible
+    // position (after the set of zero rows).
+    assertThat( resultSet.next(), is( false ) );
+
+    // Main check:  That row data access methods throw SQLException.
+    try {
+      resultSet.getString( 1 );
+      fail( "Did get expected SQLException." );
+    }
+    catch ( SQLException e ) {
+      // Expect something like current InvalidRowSQLException saying
+      // "Result set cursor is still before all rows.  Call next() first."
+      assertThat( e, instanceOf( InvalidCursorStateSqlException.class ) );
+      assertThat( e.toString(), containsString( "before" ) );
+    }
+    // (Any non-SQLException exception is unexpected result.)
+
+    assertThat( resultSet.next(), is( false ) );
+
+    // TODO:  Ideally, test all other accessor methods.
+  }
+
+
+  // TODO:  Ideally, test other methods.
+
+}


Mime
View raw message