drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [1/3] drill git commit: DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the
Date Sun, 10 May 2015 20:45:06 GMT
Repository: drill
Updated Branches:
  refs/heads/master 4e596334e -> 3a294abcc


http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index da69e9e..3e4dcb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -25,12 +25,13 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.math3.util.Pair;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -60,7 +61,9 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -68,15 +71,14 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
+ * execution by injecting exceptions at various points and to cancellations in various phases.
+ * The test cases are mentioned in DRILL-2383.
  */
-@Ignore
 public class TestDrillbitResilience {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
 
@@ -90,7 +92,6 @@ public class TestDrillbitResilience {
    * counting sys.drillbits.
    */
   private static final String TEST_QUERY = "select * from sys.memory";
-  private static final long PAUSE_TIME_MILLIS = 3000L;
 
   private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
     if (drillbits.containsKey(name)) {
@@ -184,18 +185,17 @@ public class TestDrillbitResilience {
   }
 
   /**
-   * Clear all exceptions.
+   * Clear all injections.
    */
   private static void clearAllInjections() {
-    assertTrue(drillClient != null);
+    Preconditions.checkNotNull(drillClient);
     ControlsInjectionUtil.clearControls(drillClient);
   }
 
   /**
    * Check that all the drillbits are ok.
    * <p/>
-   * <p>The current implementation does this by counting the number of drillbits using a
-   * query.
+   * <p>The current implementation does this by counting the number of drillbits using a query.
    */
   private static void assertDrillbitsOk() {
       final SingleRowListener listener = new SingleRowListener() {
@@ -245,13 +245,14 @@ public class TestDrillbitResilience {
     try {
       QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
       listener.waitForCompletion();
-      assertTrue(listener.getQueryState() == QueryState.COMPLETED);
+      final QueryState state = listener.getQueryState();
+      assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED);
     } catch (final Exception e) {
       throw new RuntimeException("Couldn't query active drillbits", e);
     }
 
     final List<DrillPBError> errorList = listener.getErrorList();
-    assertTrue(errorList.isEmpty());
+    assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
   }
 
   @SuppressWarnings("static-method")
@@ -262,10 +263,10 @@ public class TestDrillbitResilience {
   }
 
   /**
-   * Set the given exceptions.
+   * Set the given controls.
    */
-  private static void setExceptions(final String controlsString) {
-    ControlsInjectionUtil.setControls(drillClient, controlsString);
+  private static void setControls(final String controls) {
+    ControlsInjectionUtil.setControls(drillClient, controls);
   }
 
   /**
@@ -332,16 +333,16 @@ public class TestDrillbitResilience {
    */
   private static void assertExceptionInjected(final Throwable throwable,
                                               final Class<? extends Throwable> exceptionClass, final String desc) {
-    assertTrue(throwable instanceof UserException);
+    assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
     final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
-    assertEquals(exceptionClass.getName(), cause.getExceptionClass());
-    assertEquals(desc, cause.getMessage());
+    assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
+    assertEquals("Exception sites should match.", desc, cause.getMessage());
   }
 
   @Test
   public void settingNoopInjectionsAndQuery() {
     final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
-    setExceptions(controls);
+    setControls(controls);
     try {
       QueryTestUtil.test(drillClient, TEST_QUERY);
     } catch (final Exception e) {
@@ -357,7 +358,7 @@ public class TestDrillbitResilience {
    */
   private static void testForeman(final String desc) {
     final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
-    setExceptions(controls);
+    setControls(controls);
     try {
       QueryTestUtil.test(drillClient, TEST_QUERY);
       fail();
@@ -372,32 +373,39 @@ public class TestDrillbitResilience {
     testForeman("run-try-beginning");
   }
 
-  /*
-   * TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way
-   * that it gates input via stateListener. That could be tricky, since some results could be
-   * queued up before Foreman has gotten through it's run(), and they would all have to be sent
-   * before the gate is opened. There's also the question of what to do in case we detect failure
-   * there after some data has been sent. Right now, this test doesn't work because that's
-   * exactly what happens, and the client believes that the query succeeded, even though an exception
-   * was thrown after setup completed, but data was asynchronously sent to the client before that.
-   * This test also revealed that the QueryState never seems to make it to the client, so we can't
-   * detect the failure that way (see SingleRowListener's getQueryState(), which I originally tried
-   * to use here to detect query completion).
-   */
   @SuppressWarnings("static-method")
   @Test
   public void foreman_runTryEnd() {
     testForeman("run-try-end");
   }
 
+  /**
+   * Tests can use this listener to wait, until the submitted query completes or fails, by
+   * calling #waitForCompletion.
+   */
   private static class WaitUntilCompleteListener implements UserResultsListener {
-    protected final CountDownLatch latch;
+    private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
     protected QueryId queryId = null;
-    protected Exception ex = null;
-    protected QueryState state = null;
+    protected volatile Pointer<Exception> ex = new Pointer<>();
+    protected volatile QueryState state = null;
+
+    /**
+     * Method that sets the exception if the condition is not met.
+     */
+    protected final void check(final boolean condition, final String format, final Object... args) {
+      if (!condition) {
+        ex.value = new IllegalStateException(String.format(format, args));
+      }
+    }
 
-    public WaitUntilCompleteListener(final int count) {
-      latch = new CountDownLatch(count);
+    /**
+     * Method that cancels and resumes the query, in order.
+     */
+    protected final void cancelAndResume() {
+      Preconditions.checkNotNull(queryId);
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      (new CancellingThread(queryId, ex, trigger)).start();
+      (new ResumingThread(queryId, ex, trigger)).start();
     }
 
     @Override
@@ -407,7 +415,7 @@ public class TestDrillbitResilience {
 
     @Override
     public void submissionFailed(final UserException ex) {
-      this.ex = ex;
+      this.ex.value = ex;
       state = QueryState.FAILED;
       latch.countDown();
     }
@@ -424,21 +432,23 @@ public class TestDrillbitResilience {
     }
 
     public final Pair<QueryState, Exception> waitForCompletion() {
-      try {
-        latch.await();
-      } catch (final InterruptedException e) {
-        return new Pair<QueryState, Exception>(state, e);
-      }
-      return new Pair<>(state, ex);
+      latch.awaitUninterruptibly();
+      return new Pair<>(state, ex.value);
     }
   }
 
+  /**
+   * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
+   */
   private static class CancellingThread extends Thread {
-
     private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
 
-    public CancellingThread(final QueryId queryId) {
+    public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
       this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
     }
 
     @Override
@@ -446,139 +456,178 @@ public class TestDrillbitResilience {
       final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
       try {
         cancelAck.checkedGet();
-      } catch (final RpcException e) {
-        fail(e.getMessage()); // currently this failure does not fail the test
+      } catch (final RpcException ex) {
+        this.ex.value = ex;
       }
+      latch.countDown();
+    }
+  }
+
+  /**
+   * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
+   * the thread waits without interruption.
+   */
+  private static class ResumingThread extends Thread {
+    private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
+
+    public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
+      this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId);
+      try {
+        resumeAck.checkedGet();
+      } catch (final RpcException ex) {
+        this.ex.value = ex;
+      }
+    }
+  }
+
+  /**
+   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the state is not
+   * as expected or if an exception is thrown.
+   */
+  private static void assertCompleteState(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+    final QueryState actualState = result.getFirst();
+    final Exception exception = result.getSecond();
+    if (actualState != expectedState || exception != null) {
+      fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
+        expectedState, actualState, exception == null ? "none." : exception));
     }
   }
 
   /**
    * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
    */
-  private static void assertCancelled(final String controls, final WaitUntilCompleteListener listener) {
-    ControlsInjectionUtil.setControls(drillClient, controls);
+  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    setControls(controls);
 
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(String.format("Expected Query Outcome of CANCELED but had Outcome of %s", result.getFirst()),
-        result.getFirst() == QueryState.CANCELED);
-    assertTrue(String.format("Expected no Exception but had Exception %s", result.getSecond()),
-        result.getSecond() == null);
+    assertCompleteState(result, QueryState.CANCELED);
   }
 
-  @Test // Cancellation TC 1
-  public void cancelBeforeAnyResultsArrive() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+  private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
+    return "{\"injections\" : [{"
+      + "\"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + siteDesc + "\","
+      + "\"nSkip\" : " + nSkip
+      + "}]}";
+  }
+
+  private static String createPauseInjection(final Class siteClass, final String siteDesc) {
+    return createPauseInjection(siteClass, siteDesc, 0);
+  }
 
+  @Test // To test pause and resume. Test hangs if resume did not happen.
+  public void passThrough() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       @Override
       public void queryIdArrived(final QueryId queryId) {
-        (new CancellingThread(queryId)).start();
+        super.queryIdArrived(queryId);
+        final ExtendedLatch trigger = new ExtendedLatch(1);
+        (new ResumingThread(queryId, ex, trigger)).start();
+        trigger.countDown();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + Foreman.class.getName() + "\","
-      + "\"desc\":\"pause-run-plan\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
+    final String controls = createPauseInjection(PojoRecordReader.class, "read-next");
+    setControls(controls);
+
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertCompleteState(result, QueryState.COMPLETED);
+  }
+
+  @Test // Cancellation TC 1: cancel before any result set is returned
+  public void cancelBeforeAnyResultsArrive() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
 
-    assertCancelled(controls, listener);
+      @Override
+      public void queryIdArrived(final QueryId queryId) {
+        super.queryIdArrived(queryId);
+        cancelAndResume();
+      }
+    };
+
+    final String controls = createPauseInjection(Foreman.class, "foreman-ready");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Cancellation TC 2
+  @Test // Cancellation TC 2: cancel in the middle of fetching result set
   public void cancelInMiddleOfFetchingResults() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private boolean cancelRequested = false;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-        if (! cancelRequested) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+        if (!cancelRequested) {
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
           cancelRequested = true;
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
-      + "\"desc\":\"sending-data\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
+    final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+    assertCancelledWithoutException(controls, listener);
   }
 
 
-  @Test // Cancellation TC 3
+  @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
   public void cancelAfterAllResultsProduced() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
         if (++count == drillbits.size()) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
-      + "\"desc\":\"send-complete\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Cancellation TC 4
+  @Test // Cancellation TC 4: cancel after everything is completed and fetched
   public void cancelAfterEverythingIsCompleted() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
         if (++count == drillbits.size()) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + Foreman.class.getName() + "\","
-      + "\"desc\":\"foreman-cleanup\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Completion TC 1
+  @Test // Completion TC 1: success
   public void successfullyCompletes() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
-    QueryTestUtil.testWithListener(
-      drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(result.getFirst() == QueryState.COMPLETED);
-    assertTrue(result.getSecond() == null);
+    assertCompleteState(result, QueryState.COMPLETED);
   }
 
   /**
@@ -586,16 +635,16 @@ public class TestDrillbitResilience {
    */
   private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
                                                final String exceptionDesc) {
-    setExceptions(controls);
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL,  TEST_QUERY, listener);
+    setControls(controls);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(result.getFirst() == QueryState.FAILED);
-    final Exception e = result.getSecond();
-    assertExceptionInjected(e, exceptionClass, exceptionDesc);
+    final QueryState state = result.getFirst();
+    assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
+    assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 2
+  @Test // Completion TC 2: failed query - before query is executed - while sql parsing
   public void failsWhenParsing() {
     final String exceptionDesc = "sql-parsing";
     final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
@@ -603,7 +652,7 @@ public class TestDrillbitResilience {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 3
+  @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
   public void failsWhenSendingFragments() {
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
@@ -611,7 +660,7 @@ public class TestDrillbitResilience {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 4
+  @Test // Completion TC 4: failed query - during query execution
   public void failsDuringExecution() {
     final String exceptionDesc = "fragment-execution";
     final Class<? extends Throwable> exceptionClass = IOException.class;

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
new file mode 100644
index 0000000..c98f54c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.util.Pointer;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCountDownLatchInjection extends BaseTestQuery {
+
+  private static final UserSession session = UserSession.Builder.newBuilder()
+    .withCredentials(UserCredentials.newBuilder()
+      .setUserName("foo")
+      .build())
+    .withUserProperties(UserProperties.getDefaultInstance())
+    .withOptionManager(bits[0].getContext().getOptionManager())
+    .build();
+
+  /**
+   * Class whose methods we want to simulate count down latches at run-time for testing
+   * purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
+   * {@link org.apache.drill.exec.ops.FragmentContext}.
+   */
+  private static class DummyClass {
+    private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+
+    private final QueryContext context;
+    private final CountDownLatch latch;
+    private final int count;
+
+    public DummyClass(final QueryContext context, final CountDownLatch latch, final int count) {
+      this.context = context;
+      this.latch = latch;
+      this.count = count;
+    }
+
+    public static final String LATCH_NAME = "<<latch>>";
+
+    /**
+     * Method that initializes and waits for "count" number of count down (from those many threads)
+     */
+    public long initAndWait() throws InterruptedException {
+      // ... code ...
+
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).initialize(count);
+
+      // ... code ...
+      latch.countDown(); // trigger threads spawn
+
+      final long startTime = System.currentTimeMillis();
+      // simulated wait for "count" threads to count down on the same latch
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).await();
+      final long endTime = System.currentTimeMillis();
+      // ... code ...
+      return (endTime - startTime);
+    }
+
+    public void countDown() {
+      // ... code ...
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).countDown();
+      // ... code ...
+    }
+  }
+
+  private static class ThreadCreator extends Thread {
+
+    private final DummyClass dummyClass;
+    private final ExtendedLatch latch;
+    private final int count;
+    private final Pointer<Long> countingDownTime;
+
+    public ThreadCreator(final DummyClass dummyClass, final ExtendedLatch latch, final int count,
+                         final Pointer<Long> countingDownTime) {
+      this.dummyClass = dummyClass;
+      this.latch = latch;
+      this.count = count;
+      this.countingDownTime = countingDownTime;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      final long startTime = System.currentTimeMillis();
+      for (int i = 0; i < count; i++) {
+        (new Thread() {
+          @Override
+          public void run() {
+            dummyClass.countDown();
+          }
+        }).start();
+      }
+      final long endTime = System.currentTimeMillis();
+      countingDownTime.value = (endTime - startTime);
+    }
+  }
+
+  @Test // test would hang if the correct init, wait and countdowns did not happen, and the test timeout mechanism will
+  // catch that case
+  public void latchInjected() {
+    final int threads = 10;
+    final ExtendedLatch trigger = new ExtendedLatch(1);
+    final Pointer<Long> countingDownTime = new Pointer<>();
+
+    final String jsonString = "{\"injections\":[{"
+      + "\"type\":\"latch\"," +
+      "\"siteClass\":\"org.apache.drill.exec.testing.TestCountDownLatchInjection$DummyClass\","
+      + "\"desc\":\"" + DummyClass.LATCH_NAME + "\""
+      + "}]}";
+
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+
+    final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
+    (new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();
+    final long timeSpentWaiting;
+    try {
+      timeSpentWaiting = dummyClass.initAndWait();
+    } catch (final InterruptedException e) {
+      fail("Thread should not be interrupted; there is no deliberate attempt.");
+      return;
+    }
+    assertTrue(timeSpentWaiting >= countingDownTime.value);
+    try {
+      queryContext.close();
+    } catch (final Exception e) {
+      fail("Failed to close query context: " + e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 5fa2b3f..ba29c58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -18,20 +18,33 @@
 package org.apache.drill.exec.testing;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.Pointer;
 import org.junit.Test;
 import org.slf4j.Logger;
 
+import java.util.concurrent.CountDownLatch;
+
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestPauseInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withCredentials(UserCredentials.newBuilder()
+        .setUserName("foo")
+        .build())
       .withUserProperties(UserProperties.getDefaultInstance())
       .withOptionManager(bits[0].getContext().getOptionManager())
       .build();
@@ -46,9 +59,11 @@ public class TestPauseInjection extends BaseTestQuery {
     private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
 
     private final QueryContext context;
+    private final CountDownLatch latch;
 
-    public DummyClass(final QueryContext context) {
+    public DummyClass(final QueryContext context, final CountDownLatch latch) {
       this.context = context;
+      this.latch = latch;
     }
 
     public static final String PAUSES = "<<pauses>>";
@@ -61,6 +76,7 @@ public class TestPauseInjection extends BaseTestQuery {
     public long pauses() {
       // ... code ...
 
+      latch.countDown();
       final long startTime = System.currentTimeMillis();
       // simulated pause
       injector.injectPause(context.getExecutionControls(), PAUSES, logger);
@@ -71,30 +87,136 @@ public class TestPauseInjection extends BaseTestQuery {
     }
   }
 
+  private static class ResumingThread extends Thread {
+
+    private final QueryContext context;
+    private final ExtendedLatch latch;
+    private final Pointer<Exception> ex;
+    private final long millis;
+
+    public ResumingThread(final QueryContext context, final ExtendedLatch latch, final Pointer<Exception> ex,
+                          final long millis) {
+      this.context = context;
+      this.latch = latch;
+      this.ex = ex;
+      this.millis = millis;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      try {
+        Thread.sleep(millis);
+      } catch (final InterruptedException ex) {
+        this.ex.value = ex;
+      }
+      context.getExecutionControls().unpauseAll();
+    }
+  }
+
   @Test
   public void pauseInjected() {
-    final long pauseMillis = 1000L;
+    final long expectedDuration = 1000L;
+    final ExtendedLatch trigger = new ExtendedLatch(1);
+    final Pointer<Exception> ex = new Pointer<>();
+
     final String jsonString = "{\"injections\":[{"
       + "\"type\":\"pause\"," +
       "\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
       + "\"desc\":\"" + DummyClass.PAUSES + "\","
-      + "\"millis\":" + pauseMillis + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
+      + "\"nSkip\":0"
       + "}]}";
 
     ControlsInjectionUtil.setControls(session, jsonString);
 
     final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
 
+    (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
     // test that the pause happens
-    final DummyClass dummyClass = new DummyClass(queryContext);
-    final long time = dummyClass.pauses();
-    assertTrue((time >= pauseMillis));
+    final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+    final long actualDuration = dummyClass.pauses();
+    assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+      expectedDuration <= actualDuration);
+    assertTrue("No exception should be thrown.", ex.value == null);
     try {
       queryContext.close();
-    } catch (Exception e) {
-      fail();
+    } catch (final Exception e) {
+      fail("Failed to close query context: " + e);
+    }
+  }
+
+  @Test
+  public void pauseOnSpecificBit() {
+    final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+    final ZookeeperHelper zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
+
+    // Creating two drillbits
+    final Drillbit drillbit1, drillbit2;
+    final DrillConfig drillConfig = zkHelper.getConfig();
+    try {
+      drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+      drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+    } catch (final DrillbitStartupException e) {
+      throw new RuntimeException("Failed to start two drillbits.", e);
+    }
+
+    final DrillbitContext drillbitContext1 = drillbit1.getContext();
+    final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+    final UserSession session = UserSession.Builder.newBuilder()
+      .withCredentials(UserCredentials.newBuilder()
+        .setUserName("foo")
+        .build())
+      .withUserProperties(UserProperties.getDefaultInstance())
+      .withOptionManager(drillbitContext1.getOptionManager())
+      .build();
+
+    final DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
+    final String jsonString = "{\"injections\":[{"
+      + "\"type\" : \"pause\"," +
+      "\"siteClass\" : \"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+      + "\"desc\" : \"" + DummyClass.PAUSES + "\","
+      + "\"nSkip\" : 0, "
+      + "\"address\" : \"" + drillbitEndpoint1.getAddress() + "\","
+      + "\"port\" : " + drillbitEndpoint1.getUserPort()
+      + "}]}";
+
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    {
+      final long expectedDuration = 1000L;
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      final Pointer<Exception> ex = new Pointer<>();
+      final QueryContext queryContext = new QueryContext(session, drillbitContext1);
+      (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
+      // test that the pause happens
+      final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+      final long actualDuration = dummyClass.pauses();
+      assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+        expectedDuration <= actualDuration);
+      assertTrue("No exception should be thrown.", ex.value == null);
+      try {
+        queryContext.close();
+      } catch (final Exception e) {
+        fail("Failed to close query context: " + e);
+      }
+    }
+
+    {
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      final QueryContext queryContext = new QueryContext(session, drillbitContext2);
+
+      // if the resume did not happen, the test would hang
+      final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+      dummyClass.pauses();
+      try {
+        queryContext.close();
+      } catch (final Exception e) {
+        fail("Failed to close query context: " + e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index 470e976..b428337 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -47,13 +47,13 @@ public final class BitControl {
      */
     GOODBYE(2, 2),
     /**
-     * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+     * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
      *
      * <pre>
      * bit requests
      * </pre>
      */
-    REQ_INIATILIZE_FRAGMENTS(3, 3),
+    REQ_INITIALIZE_FRAGMENTS(3, 3),
     /**
      * <code>REQ_CANCEL_FRAGMENT = 6;</code>
      *
@@ -91,25 +91,33 @@ public final class BitControl {
      */
     REQ_QUERY_CANCEL(9, 15),
     /**
+     * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+     *
+     * <pre>
+     * send a resume message for a fragment, returns Ack
+     * </pre>
+     */
+    REQ_UNPAUSE_FRAGMENT(10, 16),
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
      * bit responses
      * </pre>
      */
-    RESP_FRAGMENT_HANDLE(10, 11),
+    RESP_FRAGMENT_HANDLE(11, 11),
     /**
      * <code>RESP_FRAGMENT_STATUS = 12;</code>
      */
-    RESP_FRAGMENT_STATUS(11, 12),
+    RESP_FRAGMENT_STATUS(12, 12),
     /**
      * <code>RESP_BIT_STATUS = 13;</code>
      */
-    RESP_BIT_STATUS(12, 13),
+    RESP_BIT_STATUS(13, 13),
     /**
      * <code>RESP_QUERY_STATUS = 14;</code>
      */
-    RESP_QUERY_STATUS(13, 14),
+    RESP_QUERY_STATUS(14, 14),
     ;
 
     /**
@@ -125,13 +133,13 @@ public final class BitControl {
      */
     public static final int GOODBYE_VALUE = 2;
     /**
-     * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+     * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
      *
      * <pre>
      * bit requests
      * </pre>
      */
-    public static final int REQ_INIATILIZE_FRAGMENTS_VALUE = 3;
+    public static final int REQ_INITIALIZE_FRAGMENTS_VALUE = 3;
     /**
      * <code>REQ_CANCEL_FRAGMENT = 6;</code>
      *
@@ -169,6 +177,14 @@ public final class BitControl {
      */
     public static final int REQ_QUERY_CANCEL_VALUE = 15;
     /**
+     * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+     *
+     * <pre>
+     * send a resume message for a fragment, returns Ack
+     * </pre>
+     */
+    public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16;
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
@@ -197,13 +213,14 @@ public final class BitControl {
         case 0: return HANDSHAKE;
         case 1: return ACK;
         case 2: return GOODBYE;
-        case 3: return REQ_INIATILIZE_FRAGMENTS;
+        case 3: return REQ_INITIALIZE_FRAGMENTS;
         case 6: return REQ_CANCEL_FRAGMENT;
         case 7: return REQ_RECEIVER_FINISHED;
         case 8: return REQ_FRAGMENT_STATUS;
         case 9: return REQ_BIT_STATUS;
         case 10: return REQ_QUERY_STATUS;
         case 15: return REQ_QUERY_CANCEL;
+        case 16: return REQ_UNPAUSE_FRAGMENT;
         case 11: return RESP_FRAGMENT_HANDLE;
         case 12: return RESP_FRAGMENT_STATUS;
         case 13: return RESP_BIT_STATUS;
@@ -7395,16 +7412,17 @@ public final class BitControl {
       "oint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_tim" +
       "e\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030",
       "\001 \001(\0132\030.exec.bit.FragmentHandle\022(\n\006sende" +
-      "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\271\002\n\007Rp" +
+      "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\323\002\n\007Rp" +
       "cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE" +
-      "\020\002\022\034\n\030REQ_INIATILIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
+      "\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
       "CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISH" +
       "ED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT" +
       "_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_Q" +
-      "UERY_CANCEL\020\017\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022" +
-      "\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_ST" +
-      "ATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016B+\n\033org.apa",
-      "che.drill.exec.protoB\nBitControlH\001"
+      "UERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022" +
+      "\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP_FRAGME" +
+      "NT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP",
+      "_QUERY_STATUS\020\016B+\n\033org.apache.drill.exec" +
+      ".protoB\nBitControlH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index c3ff58b..afe8bfe 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -67,25 +67,33 @@ public final class UserProtos {
      */
     REQUEST_RESULTS(5, 5),
     /**
+     * <code>RESUME_PAUSED_QUERY = 11;</code>
+     *
+     * <pre>
+     * user is sending a query resume request to the drillbit
+     * </pre>
+     */
+    RESUME_PAUSED_QUERY(6, 11),
+    /**
      * <code>QUERY_DATA = 6;</code>
      *
      * <pre>
      * bit to user
      * </pre>
      */
-    QUERY_DATA(6, 6),
+    QUERY_DATA(7, 6),
     /**
      * <code>QUERY_HANDLE = 7;</code>
      */
-    QUERY_HANDLE(7, 7),
+    QUERY_HANDLE(8, 7),
     /**
      * <code>REQ_META_FUNCTIONS = 8;</code>
      */
-    REQ_META_FUNCTIONS(8, 8),
+    REQ_META_FUNCTIONS(9, 8),
     /**
      * <code>RESP_FUNCTION_LIST = 9;</code>
      */
-    RESP_FUNCTION_LIST(9, 9),
+    RESP_FUNCTION_LIST(10, 9),
     /**
      * <code>QUERY_RESULT = 10;</code>
      *
@@ -93,7 +101,7 @@ public final class UserProtos {
      * drillbit is reporting a query status change, most likely a terminal message, to the user
      * </pre>
      */
-    QUERY_RESULT(10, 10),
+    QUERY_RESULT(11, 10),
     ;
 
     /**
@@ -129,6 +137,14 @@ public final class UserProtos {
      */
     public static final int REQUEST_RESULTS_VALUE = 5;
     /**
+     * <code>RESUME_PAUSED_QUERY = 11;</code>
+     *
+     * <pre>
+     * user is sending a query resume request to the drillbit
+     * </pre>
+     */
+    public static final int RESUME_PAUSED_QUERY_VALUE = 11;
+    /**
      * <code>QUERY_DATA = 6;</code>
      *
      * <pre>
@@ -168,6 +184,7 @@ public final class UserProtos {
         case 3: return RUN_QUERY;
         case 4: return CANCEL_QUERY;
         case 5: return REQUEST_RESULTS;
+        case 11: return RESUME_PAUSED_QUERY;
         case 6: return QUERY_DATA;
         case 7: return QUERY_HANDLE;
         case 8: return REQ_META_FUNCTIONS;
@@ -4986,16 +5003,17 @@ public final class UserProtos {
       "n\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n\013rpc_ver" +
       "sion\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exec.user.H" +
       "andshakeStatus\022\017\n\007errorId\030\004 \001(\t\022\024\n\014error" +
-      "Message\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
+      "Message\030\005 \001(\t*\341\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
       "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY\020\003\022\020\n" +
-      "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\016\n\n" +
-      "QUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022REQ_ME" +
-      "TA_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIST\020\t\022\020" +
-      "\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMode\022\017\n\013" +
-      "STREAM_FULL\020\001*^\n\017HandshakeStatus\022\013\n\007SUCC" +
-      "ESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022\017\n\013AUTH_" +
-      "FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n\033org.apa" +
-      "che.drill.exec.protoB\nUserProtosH\001"
+      "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\027\n\023" +
+      "RESUME_PAUSED_QUERY\020\013\022\016\n\nQUERY_DATA\020\006\022\020\n" +
+      "\014QUERY_HANDLE\020\007\022\026\n\022REQ_META_FUNCTIONS\020\010\022" +
+      "\026\n\022RESP_FUNCTION_LIST\020\t\022\020\n\014QUERY_RESULT\020" +
+      "\n*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL\020\001*^" +
+      "\n\017HandshakeStatus\022\013\n\007SUCCESS\020\001\022\030\n\024RPC_VE" +
+      "RSION_MISMATCH\020\002\022\017\n\013AUTH_FAILED\020\003\022\023\n\017UNK" +
+      "NOWN_FAILURE\020\004B+\n\033org.apache.drill.exec." +
+      "protoB\nUserProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
index 4d03073..6687a86 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
@@ -28,6 +28,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
     RUN_QUERY(3),
     CANCEL_QUERY(4),
     REQUEST_RESULTS(5),
+    RESUME_PAUSED_QUERY(11),
     QUERY_DATA(6),
     QUERY_HANDLE(7),
     REQ_META_FUNCTIONS(8),
@@ -61,6 +62,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
             case 8: return REQ_META_FUNCTIONS;
             case 9: return RESP_FUNCTION_LIST;
             case 10: return QUERY_RESULT;
+            case 11: return RESUME_PAUSED_QUERY;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 93bc33c..c9295f0 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -14,25 +14,25 @@ enum RpcType {
   HANDSHAKE = 0;
   ACK = 1;
   GOODBYE = 2;
-    
+
   // bit requests
-  REQ_INIATILIZE_FRAGMENTS = 3; // Returns Handle
-    
+  REQ_INITIALIZE_FRAGMENTS = 3; // Returns Handle
+
   REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
   REQ_RECEIVER_FINISHED = 7;
   REQ_FRAGMENT_STATUS = 8; // send a fragment status, return Ack
   REQ_BIT_STATUS = 9; // get bit status.
   REQ_QUERY_STATUS = 10;
   REQ_QUERY_CANCEL = 15;
-      
-    // bit responses
+  REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+
+  // bit responses
   RESP_FRAGMENT_HANDLE = 11;
   RESP_FRAGMENT_STATUS = 12;
   RESP_BIT_STATUS = 13;
   RESP_QUERY_STATUS = 14;
 }
 
-
 message BitControlHandshake{
   optional int32 rpc_version = 1;
   optional exec.shared.RpcChannel channel = 2 [default = BIT_CONTROL];

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto
index 185a646..ceed3d8 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -17,6 +17,7 @@ enum RpcType {
   RUN_QUERY = 3; // user is submitting a new query to the drillbit
   CANCEL_QUERY = 4; // user is sending a query cancellation request to the drillbit
   REQUEST_RESULTS = 5;
+  RESUME_PAUSED_QUERY = 11; // user is sending a query resume request to the drillbit
 
   // bit to user
   QUERY_DATA = 6; // drillbit is sending a query result data batch to the user


Mime
View raw message