drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [4/6] drill git commit: DRILL-2755: (part2) Use and handle InterruptedException during query processing
Date Wed, 13 May 2015 02:42:53 GMT
DRILL-2755: (part2) Use and handle InterruptedException during query processing


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36163105
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36163105
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36163105

Branch: refs/heads/master
Commit: 361631056ad00ab5f7df1eeed50d95f654f642a3
Parents: ed200e2
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Tue May 12 10:23:38 2015 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  9 ++++
 .../drill/exec/ops/AccountingDataTunnel.java    | 11 ++++
 .../org/apache/drill/exec/ops/Consumer.java     |  5 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  8 +++
 .../apache/drill/exec/ops/StatusHandler.java    |  6 +++
 .../exec/physical/impl/SingleSenderCreator.java |  8 ++-
 .../impl/mergereceiver/MergingRecordBatch.java  | 10 ++++
 .../partitionsender/PartitionerDecorator.java   |  1 +
 .../UnorderedReceiverBatch.java                 | 10 ++++
 .../drill/exec/rpc/BaseRpcOutcomeListener.java  |  7 ++-
 .../org/apache/drill/exec/rpc/BasicClient.java  | 53 ++++++++++++++++----
 .../drill/exec/rpc/DrillRpcFutureImpl.java      | 17 +++----
 .../apache/drill/exec/rpc/FutureBitCommand.java |  6 +++
 .../apache/drill/exec/rpc/ListeningCommand.java |  4 ++
 .../drill/exec/rpc/ReconnectingConnection.java  | 51 ++++++++++++-------
 .../apache/drill/exec/rpc/RemoteConnection.java |  3 +-
 .../drill/exec/rpc/RpcOutcomeListener.java      | 14 +++++-
 .../apache/drill/exec/rpc/data/DataTunnel.java  | 42 +++++++++++++++-
 .../drill/exec/rpc/user/QueryResultHandler.java | 14 +++++-
 .../exec/testing/NoOpControlsInjector.java      |  5 ++
 .../apache/drill/exec/work/foreman/Foreman.java | 40 +++++++++++++--
 .../drill/exec/work/foreman/QueryManager.java   |  6 +++
 .../exec/work/fragment/FragmentExecutor.java    |  3 ++
 .../src/main/resources/drill-module.conf        |  5 +-
 .../org/apache/drill/exec/ZookeeperHelper.java  |  7 ++-
 .../apache/drill/exec/server/TestBitRpc.java    |  4 ++
 .../exec/server/TestDrillbitResilience.java     |  8 +++
 27 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1a10aa2..7bbb815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -86,6 +86,15 @@ public interface ExecConstants {
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
   public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+
+  /**
+   * Currently if a query is cancelled, but one of the fragments reports the status as FAILED
instead of CANCELLED or
+   * FINISHED we report the query result as CANCELLED by swallowing the failures occurred
in fragments. This BOOT
+   * setting allows the user to see the query status as failure. Useful for developers/testers.
+   */
+  public static final String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
+      "drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
+
   /** Fragment memory planning */
   public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
   public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 2bcfdbc..2659464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -21,6 +21,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.slf4j.Logger;
 
 /**
  * Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status
of batches sent to
@@ -41,4 +44,12 @@ public class AccountingDataTunnel {
     sendingAccountor.increment();
     tunnel.sendRecordBatch(statusHandler, batch);
   }
+
+  /**
+   * See {@link DataTunnel#setTestInjectionControls(ExecutionControlsInjector, ExecutionControls,
Logger)}.
+   */
+  public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+      final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+    tunnel.setTestInjectionControls(testInjector, testControls, testLogger);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
index 9b8ba38..0a1a397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
@@ -19,5 +19,8 @@ package org.apache.drill.exec.ops;
 
 //TODO replace this when we switch to JDK8, which includes this
 public interface Consumer<T> {
-  public void accept(T t);
+
+  void accept(T t);
+
+  void interrupt(final InterruptedException e);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index cf4e9bb..1cbe886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -90,6 +90,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     public void accept(final RpcException e) {
       fail(e);
     }
+
+    @Override
+    public void interrupt(final InterruptedException e) {
+      if (shouldContinue()) {
+        logger.error("Received an unexpected interrupt while waiting for the data send to
complete.", e);
+        fail(e);
+      }
+    }
   };
 
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer,
sendingAccountor);

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
index 79fc0b0..66e35b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
@@ -53,4 +53,10 @@ public class StatusHandler implements RpcOutcomeListener<Ack> {
     // if we didn't get ack ok, we'll need to kill the query.
     consumer.accept(new RpcException("Data not accepted downstream."));
   }
+
+  @Override
+  public void interrupted(final InterruptedException e) {
+    sendingAccountor.decrement();
+    consumer.interrupt(e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index fe6239e..1f6767c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
 
@@ -41,8 +42,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     return new SingleSenderRootExec(context, children.iterator().next(), config);
   }
 
-  private static class SingleSenderRootExec extends BaseRootExec {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+  public static class SingleSenderRootExec extends BaseRootExec {
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+    private static final ExecutionControlsInjector injector =
+        ExecutionControlsInjector.getInjector(SingleSenderRootExec.class);
 
     private final FragmentHandle oppositeHandle;
 
@@ -76,6 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
           .setMinorFragmentId(config.getOppositeMinorFragmentId())
           .build();
       tunnel = context.getDataTunnel(config.getDestination());
+      tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 611052b..6da132b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -534,6 +534,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
+  // TODO: Code duplication. UnorderedReceiverBatch has the same implementation.
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
@@ -545,6 +546,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      if (context.shouldContinue()) {
+        final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished
message";
+        logger.error(errMsg, e);
+        context.fail(new RpcException(errMsg, e));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c355070..e210514 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -178,6 +178,7 @@ public class PartitionerDecorator {
         } catch (final InterruptedException e) {
           // If the fragment state says we shouldn't continue, cancel or interrupt partitioner
threads
           if (!context.shouldContinue()) {
+            logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
             for(Future f : taskFutures) {
               f.cancel(true);
             }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index e40fe54..1498441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -240,6 +240,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     }
   }
 
+  // TODO: Code duplication. MergingRecordBatch has the same implementation.
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
@@ -251,6 +252,15 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      if (context.shouldContinue()) {
+        final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished
message";
+        logger.error(errMsg, e);
+        context.fail(new RpcException(errMsg, e));
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 9b071ad..dfc62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -32,5 +32,10 @@ public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T>
{
   public void success(T value, ByteBuf buffer) {
   }
 
-
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void interrupted(final InterruptedException ex) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index d551173..34758ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -194,19 +194,45 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
+        boolean isInterrupted = false;
+
+        final long timeoutMills = 30000;
+
+        // We want to wait for at least 30 secs when interrupts occur. Establishing a connection
fails/succeeds quickly,
+        // So there is no point propagating the interruption as failure immediately.
+        final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
         // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
-        try {
-          future.get();
-          if (future.isSuccess()) {
-            // send a handshake on the current thread. This is the only time we will send
from within the event thread.
-            // We can do this because the connection will not be backed up.
-            send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass,
true);
-          } else {
-            l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection
failure."));
+        while(true) {
+          try {
+            future.get(timeoutMills, TimeUnit.MILLISECONDS);
+            if (future.isSuccess()) {
+              // send a handshake on the current thread. This is the only time we will send
from within the event thread.
+              // We can do this because the connection will not be backed up.
+              send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass,
true);
+            } else {
+              l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection
failure."));
+            }
+            // logger.debug("Handshake queued for send.");
+            break;
+          } catch (final InterruptedException interruptEx) {
+            // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+            isInterrupted = true;
+            final long wait = targetMillis - System.currentTimeMillis();
+            if (wait < 1) {
+              l.connectionFailed(FailureType.CONNECTION, interruptEx);
+              break;
+            }
+          } catch (final Exception ex) {
+            l.connectionFailed(FailureType.CONNECTION, ex);
+            break;
           }
-          // logger.debug("Handshake queued for send.");
-        } catch (Exception ex) {
-          l.connectionFailed(FailureType.CONNECTION, ex);
+        }
+
+        if (isInterrupted) {
+          // Preserve evidence that the interruption occurred so that code higher up on the
call stack can learn of the
+          // interruption and respond to it if it wants to.
+          Thread.currentThread().interrupt();
         }
       }
     }
@@ -235,6 +261,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         }
       }
 
+      @Override
+      public void interrupted(final InterruptedException ex) {
+        logger.warn("Interrupted while waiting for handshake response", ex);
+        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 19d9c30..cbe63c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -31,17 +31,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException>
imple
     super(new InnerFuture<V>());
   }
 
-  /**
-   * Drill doesn't currently support rpc cancellations since nearly all requests should be
either instance of
-   * asynchronous. Business level cancellation is managed a separate call (e.g. canceling
a query.). Calling this method
-   * will result in an UnsupportedOperationException.
-   */
-  @Override
-  public boolean cancel(boolean mayInterruptIfRunning) {
-    throw new UnsupportedOperationException(
-        "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
-  }
-
   @Override
   protected RpcException mapException(Exception ex) {
     return RpcException.mapException(ex);
@@ -72,6 +61,12 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException>
imple
   }
 
   @Override
+  public void interrupted(final InterruptedException ex) {
+    // Propagate the interrupt to inner future
+    ( (InnerFuture<V>)delegate()).cancel(true);
+  }
+
+  @Override
   public ByteBuf getBuffer() {
     return buffer;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 6c7bf3e..53d0772 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -58,6 +58,12 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends
RemoteCo
       settableFuture.set(value);
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // If we are interrupted while performing the command, consider as failure.
+      logger.warn("Interrupted while running the command", e);
+      failed(new RpcException(e));
+    }
   }
 
   public DrillRpcFuture<T> getFuture() {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index e32ca8a..92022b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -55,6 +55,10 @@ public abstract class ListeningCommand<T extends MessageLite, C extends
RemoteCo
       listener.success(value, buf);
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      listener.interrupted(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index f0787a5..12e0063 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -22,6 +22,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
@@ -100,28 +102,44 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends
RemoteConne
      * Called by
      */
     public void waitAndRun() {
-      try {
-//        logger.debug("Waiting for connection.");
-        CONNECTION_TYPE connection = this.get();
-
-        if (connection == null) {
-//          logger.debug("Connection failed.");
-          return;
-        } else {
-//          logger.debug("Connection received. {}", connection);
-          cmd.connectionSucceeded(connection);
-//          logger.debug("Finished connection succeeded activity.");
+      boolean isInterrupted = false;
+
+      final long timeoutMills = 30000;
+      // We want to wait for at least 30 secs when interrupts occur. Establishing a connection
fails/succeeds quickly,
+      // So there is no point propagating the interruption as failure immediately.
+      final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
+      while(true) {
+        try {
+          //        logger.debug("Waiting for connection.");
+          CONNECTION_TYPE connection = this.get(timeoutMills, TimeUnit.MILLISECONDS);
+
+          if (connection == null) {
+            //          logger.debug("Connection failed.");
+          } else {
+            //          logger.debug("Connection received. {}", connection);
+            cmd.connectionSucceeded(connection);
+            //          logger.debug("Finished connection succeeded activity.");
+          }
+          break;
+        } catch (final InterruptedException interruptEx) {
+          // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+          isInterrupted = true;
+          final long wait = targetMillis - System.currentTimeMillis();
+          if (wait < 1) {
+            cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
+            break;
+          }
+        } catch (final ExecutionException | TimeoutException ex) {
+          cmd.connectionFailed(FailureType.CONNECTION, ex);
         }
-      } catch (final InterruptedException e) {
-        cmd.connectionFailed(FailureType.CONNECTION, e);
+      }
 
+      if (isInterrupted) {
         // Preserve evidence that the interruption occurred so that code higher up on the
call stack can learn of the
         // interruption and respond to it if it wants to.
         Thread.currentThread().interrupt();
-      } catch (ExecutionException e) {
-        throw new IllegalStateException();
       }
-
     }
 
     @Override
@@ -243,7 +261,6 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends
RemoteConne
     public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType
type, Throwable t) {
       delegate.connectionFailed(type, t);
     }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 2ee9263..199569c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -71,7 +71,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
       writeManager.waitForWritable();
       return true;
     }catch(final InterruptedException e){
-      listener.failed(new RpcException(e));
+      listener.interrupted(e);
 
       // Preserve evidence that the interruption occurred so that code higher up on the call
stack can learn of the
       // interruption and respond to it if it wants to.
@@ -135,7 +135,6 @@ public abstract class RemoteConnection implements ConnectionThrottle,
AutoClosea
       if (channel.isActive()) {
         channel.close().get();
       }
-      channel.close().get();
     } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 7d7c860..4485cf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -21,9 +21,19 @@ import io.netty.buffer.ByteBuf;
 
 public interface RpcOutcomeListener<V> {
 
-  public void failed(RpcException ex);
-  public void success(V value, ByteBuf buffer);
+  /**
+   * Called when an error occurred while waiting for the RPC outcome.
+   * @param ex
+   */
+  void failed(RpcException ex);
 
 
+  void success(V value, ByteBuf buffer);
+
+  /**
+   * Called when the sending thread is interrupted. Possible when the fragment is cancelled
due to query cancellations or
+   * failures.
+   */
+  void interrupted(final InterruptedException e);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index ed31bed..143020f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -31,6 +31,8 @@ import org.apache.drill.exec.rpc.ListeningCommand;
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 
 public class DataTunnel {
@@ -39,17 +41,49 @@ public class DataTunnel {
   private final DataConnectionManager manager;
   private final Semaphore sendingSemaphore = new Semaphore(3);
 
+  // Needed for injecting a test pause
+  private boolean isInjectionControlSet;
+  private ExecutionControlsInjector testInjector;
+  private ExecutionControls testControls;
+  private org.slf4j.Logger testLogger;
+
+
   public DataTunnel(DataConnectionManager manager) {
     this.manager = manager;
   }
 
+  /**
+   * Once a DataTunnel is created, clients of DataTunnel can pass injection controls to enable
setting injections at
+   * pre-defined places. Currently following injection sites are available.
+   *
+   * 1. In method {@link #sendRecordBatch(RpcOutcomeListener, FragmentWritableBatch)}, an
interruptible pause injection
+   *    is available before acquiring the sending slot. Site name is: "data-tunnel-send-batch-wait-for-interrupt"
+   *
+   * @param testInjector
+   * @param testControls
+   * @param testLogger
+   */
+  public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+      final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+    isInjectionControlSet = true;
+    this.testInjector = testInjector;
+    this.testControls = testControls;
+    this.testLogger = testLogger;
+  }
+
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch
batch) {
     SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
     try{
+      if (isInjectionControlSet) {
+        // Wait for interruption if set. Used to simulate the fragment interruption while
the fragment is waiting for
+        // semaphore acquire. We expect the
+        testInjector.injectInterruptiblePause(testControls, "data-tunnel-send-batch-wait-for-interrupt",
testLogger);
+      }
+
       sendingSemaphore.acquire();
       manager.runCommand(b);
     }catch(final InterruptedException e){
-      outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.",
e));
+      outcomeListener.interrupted(e);
 
       // Preserve evidence that the interruption occurred so that code higher up on the call
stack can learn of the
       // interruption and respond to it if it wants to.
@@ -57,6 +91,7 @@ public class DataTunnel {
     }
   }
 
+  // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
   public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch
batch) {
     SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
     try{
@@ -93,6 +128,11 @@ public class DataTunnel {
       inner.success(value, buffer);
     }
 
+    @Override
+    public void interrupted(InterruptedException e) {
+      sendingSemaphore.release();
+      inner.interrupted(e);
+    }
   }
 
   private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection>
{

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 143d104..8443948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -349,9 +349,21 @@ public class QueryResultHandler {
           throw new IllegalStateException("Trying to replace a non-buffering User Results
listener.");
         }
       }
-
     }
 
+    @Override
+    public void interrupted(final InterruptedException ex) {
+      logger.warn("Interrupted while waiting for query results from Drillbit", ex);
+
+      if (!isTerminal.compareAndSet(false, true)) {
+        return;
+      }
+
+      closeFuture.removeListener(closeListener);
+
+      // Throw an interrupted UserException?
+      resultsListener.submissionFailed(UserException.systemError(ex).build());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index bb13d1f..bf4221e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -42,6 +42,11 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector
{
                                                final Logger logger) {
   }
 
+  @Override
+  public void injectInterruptiblePause(ExecutionControls executionControls, String desc,
Logger logger)
+      throws InterruptedException {
+  }
+
   /**
    * When assertions are not enabled, this count down latch that does nothing is injected.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bf62ccb..af3309f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -610,6 +610,18 @@ public class Foreman implements Runnable {
     }
 
     /**
+     * Ignore the current status and force the given failure as current status.
+     * NOTE: Used only for testing purposes. Shouldn't be used in production.
+     */
+    public void setForceFailure(final Exception exception) {
+      Preconditions.checkArgument(exception != null);
+      Preconditions.checkState(!isClosed);
+
+      resultState = QueryState.FAILED;
+      resultException = exception;
+    }
+
+    /**
      * Add an exception to the result. All exceptions after the first become suppressed
      * exceptions hanging off the first.
      *
@@ -837,6 +849,14 @@ public class Foreman implements Runnable {
         if ((newState == QueryState.CANCELED)
             || (newState == QueryState.COMPLETED)
             || (newState == QueryState.FAILED)) {
+
+          if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS))
{
+            if (newState == QueryState.FAILED) {
+              assert exception != null;
+              recordNewState(QueryState.FAILED);
+              foremanResult.setForceFailure(exception);
+            }
+          }
           /*
            * These amount to a completion of the cancellation requests' cleanup;
            * now we can clean up and send the result.
@@ -1109,6 +1129,15 @@ public class Foreman implements Runnable {
         stateListener.moveToState(QueryState.FAILED, ex);
       }
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment
submission.
+      // Consider the interrupt as failure.
+      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+      logger.error(errMsg, e);
+      failed(new RpcException(errMsg, e));
+    }
   }
 
   /**
@@ -1139,10 +1168,15 @@ public class Foreman implements Runnable {
   private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
     @Override
     public void failed(final RpcException ex) {
-      logger.info(
-          "Failure while trying communicate query result to initating client. This would
happen if a client is disconnected before response notice can be sent.",
-          ex);
+      logger.info("Failure while trying communicate query result to initiating client. "
+
+              "This would happen if a client is disconnected before response notice can be
sent.", ex);
       stateListener.moveToState(QueryState.FAILED, ex);
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      logger.warn("Interrupted while waiting for RPC outcome of sending final query result
to initiating client.");
+      stateListener.moveToState(QueryState.FAILED, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 84a38a6..eed4e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -267,6 +267,12 @@ public class QueryManager {
           ack);
       }
     }
+
+    @Override
+    public void interrupted(final InterruptedException ex) {
+      logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
+          "Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
+    }
   }
 
   QueryState updateEphemeralState(final QueryState queryState) {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2712fe7..6b44ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -145,6 +145,7 @@ public class FragmentExecutor implements Runnable {
        */
       final Thread myThread = myThreadRef.get();
       if (myThread != null) {
+        logger.debug("Interrupting fragment thread {}", myThread.getName());
         myThread.interrupt();
       }
     }
@@ -343,6 +344,8 @@ public class FragmentExecutor implements Runnable {
     case FINISHED:
       if(current == FragmentState.CANCELLATION_REQUESTED){
         target = FragmentState.CANCELLED;
+      } else if (current == FragmentState.FAILED) {
+        target = FragmentState.FAILED;
       }
       // fall-through
     case FAILED:

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index d98b97a..b1e3fde 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -159,5 +159,8 @@ drill.exec: {
       initial: 20000000
     }
   },
-  debug.error_on_leak: true
+  debug: {
+    error_on_leak: true,
+    return_error_for_failure_in_cancelled_fragments: false
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index 7fcf4cb..a5db81d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Throwables.propagate;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Properties;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.util.MiniZooKeeperCluster;
@@ -45,7 +46,11 @@ public class ZookeeperHelper {
    * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already
exist.
    */
   public ZookeeperHelper() {
-    config = DrillConfig.create();
+    final Properties overrideProps = new Properties();
+    // Forced to disable this, because currently we leak memory which is a known issue for
query cancellations.
+    // Setting this causes unittests to fail.
+    // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS,
"true");
+    config = DrillConfig.create(overrideProps);
     zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
 
     if (!testDir.exists()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 3749716..72e6c44 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -148,6 +148,10 @@ public class TestBitRpc extends ExecTest {
       }
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // TODO(We don't have any interrupts in test code)
+    }
   }
 
   private class BitComTestHandler implements DataResponseHandler {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index d72d498..3efb629 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
@@ -780,4 +781,11 @@ public class TestDrillbitResilience {
           Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
     }
   }
+
+  @Test
+  public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception
{
+    final String control =
+        createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt",
1);
+    assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+  }
 }


Mime
View raw message