drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [3/3] drill git commit: DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this logic
Date Tue, 13 Jan 2015 22:30:14 GMT
DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this
logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69db15eb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69db15eb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69db15eb

Branch: refs/heads/master
Commit: 69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4
Parents: 2a41077
Author: Yuliya Feldman <yfeldman@maprtech.com>
Authored: Mon Jan 12 14:11:45 2015 -0800
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Tue Jan 13 14:29:28 2015 -0800

----------------------------------------------------------------------
 .../exec/work/batch/ResponseSenderQueue.java    |  5 +++
 .../work/batch/UnlimitedRawBatchBuffer.java     | 36 ++++++++++------
 .../work/batch/TestUnlimitedBatchBuffer.java    | 44 ++++++++++++++++----
 3 files changed, 65 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index a7535c3..141c434 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -22,6 +22,7 @@ import java.util.Queue;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class ResponseSenderQueue {
@@ -55,4 +56,8 @@ public class ResponseSenderQueue {
     return i;
   }
 
+  @VisibleForTesting
+  boolean isEmpty() {
+    return q.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 35aec93..895918c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@@ -63,12 +64,12 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void enqueue(RawFragmentBatch batch) throws IOException {
-    if (state == BufferState.KILLED) {
-      batch.release();
-    }
     if (isFinished()) {
       if (state == BufferState.KILLED) {
+        // do not even enqueue just release and send ack back
         batch.release();
+        batch.sendOk();
+        return;
       } else {
         throw new IOException("Attempted to enqueue batch after finished");
       }
@@ -107,29 +108,29 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       if (!context.isFailed() && !context.isCancelled()) {
         context.fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", buffer.size());
-        RawFragmentBatch batch;
-        while ((batch = buffer.poll()) != null) {
-          logger.error("Batch left in queue: {}", batch);
-        }
-      }
-      RawFragmentBatch batch;
-      while ((batch = buffer.poll()) != null) {
-        if (batch.getBody() != null) {
-          batch.getBody().release();
-        }
       }
+      clearBufferWithBody();
     }
   }
 
   @Override
   public void kill(FragmentContext context) {
     state = BufferState.KILLED;
+    clearBufferWithBody();
+  }
+
+  /**
+   * Helper method to clear buffer with request bodies release
+   * also flushes ack queue - in case there are still responses pending
+   */
+  private void clearBufferWithBody() {
     while (!buffer.isEmpty()) {
       RawFragmentBatch batch = buffer.poll();
       if (batch.getBody() != null) {
         batch.getBody().release();
       }
     }
+    readController.flushResponses();
   }
 
   @Override
@@ -205,4 +206,13 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     return (state == BufferState.KILLED || state == BufferState.FINISHED);
   }
 
+  @VisibleForTesting
+  ResponseSenderQueue getReadController() {
+    return readController;
+  }
+
+  @VisibleForTesting
+  boolean isBufferEmpty() {
+    return buffer.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index 15ee3f3..a710d21 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -47,6 +49,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
   private static int FRAGMENT_COUNT = 5;
   private DrillConfig dc = DrillConfig.create();
+  private MySender mySender;
+  private UnlimitedRawBatchBuffer rawBuffer;
+  private RawFragmentBatch batch;
+  private FragmentContext context;
+  private int softLimit;
 
   private static class MySender implements ResponseSender {
 
@@ -66,17 +73,17 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
       sendCount = 0;
     }
   }
-  @Test
-  public void testBackPressure() throws Exception {
 
-    final MySender mySender = new MySender();
-    FragmentContext context = Mockito.mock(FragmentContext.class);
+  @Before
+  public void setUp() {
+    mySender = new MySender();
+    context = Mockito.mock(FragmentContext.class);
 
     Mockito.when(context.getConfig()).thenReturn(dc);
 
-    UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
+    rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
 
-    RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class);
+    batch = Mockito.mock(RawFragmentBatch.class);
 
     Mockito.when(batch.getSender()).thenReturn(mySender);
     Mockito.doAnswer(new Answer<Void>() {
@@ -91,8 +98,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
     /// start the real test
     int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
-    int softLimit = incomingBufferSize * FRAGMENT_COUNT;
+    softLimit = incomingBufferSize * FRAGMENT_COUNT;
+  }
 
+  @Test
+  public void testBackPressure() throws Exception {
     // No back pressure should be kicked in
     for ( int i = 0; i < softLimit-1; i++) {
       rawBuffer.enqueue(batch);
@@ -131,4 +141,24 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     }
   }
 
+  @Test
+  public void testAcksWithKill() throws Exception {
+    // Back pressure should be kicked in
+    for ( int i = 0; i < 2*softLimit; i++) {
+      rawBuffer.enqueue(batch);
+    }
+    assertEquals(softLimit - 1, mySender.getSendCount());
+    assertTrue(!rawBuffer.getReadController().isEmpty());
+
+    rawBuffer.kill(context);
+
+    // UnlimitedBatchBuffer queue should be cleared
+    assertTrue(rawBuffer.isBufferEmpty());
+
+    // acks queue should be cleared as well
+    assertTrue(rawBuffer.getReadController().isEmpty());
+
+    // all acks should be sent
+    assertEquals(2*softLimit, mySender.getSendCount());
+  }
 }


Mime
View raw message