drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject drill git commit: DRILL-2936: Use SpoolingRawBatchBuffer for HashToMergeExchange In order to avoid deadlocks
Date Wed, 13 May 2015 23:46:54 GMT
Repository: drill
Updated Branches:
  refs/heads/master c04a8f9fe -> 814f553f2


DRILL-2936: Use SpoolingRawBatchBuffer for HashToMergeExchange
In order to avoid deadlocks

Refactored common code in UnlimitedRawBatchBuffer and SpoolingRawBatchBuffer
 into BaseRawBatchBuffer

Removed reflection-based construction of RawBatchBuffer. Now use choose implementation
 based on plan

Updated SpoolingRawBatchBuffer to use a separate thread for spooling


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/814f553f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/814f553f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/814f553f

Branch: refs/heads/master
Commit: 814f553f2fe58c6d2d2095fbf71158fcda424658
Parents: c04a8f9
Author: Steven Phillips <smp@apache.org>
Authored: Wed May 6 20:01:35 2015 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Wed May 13 15:15:42 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/io/netty/buffer/DrillBuf.java |   2 +-
 .../exec/physical/base/AbstractReceiver.java    |   9 +-
 .../drill/exec/physical/base/Receiver.java      |   3 +
 .../exec/physical/config/BroadcastExchange.java |   2 +-
 .../physical/config/HashToMergeExchange.java    |   2 +-
 .../physical/config/HashToRandomExchange.java   |   8 +-
 .../physical/config/MergingReceiverPOP.java     |   5 +-
 .../config/OrderedPartitionExchange.java        |   2 +-
 .../physical/config/SingleMergeExchange.java    |   2 +-
 .../exec/physical/config/UnionExchange.java     |   2 +-
 .../physical/config/UnorderedDeMuxExchange.java |   2 +-
 .../physical/config/UnorderedMuxExchange.java   |   2 +-
 .../exec/physical/config/UnorderedReceiver.java |   5 +-
 .../drill/exec/record/RawFragmentBatch.java     |  20 +-
 .../exec/store/LocalSyncableFileSystem.java     |  20 +-
 .../exec/work/batch/AbstractDataCollector.java  |  17 +-
 .../exec/work/batch/BaseRawBatchBuffer.java     | 249 ++++++++++
 .../drill/exec/work/batch/RawBatchBuffer.java   |   5 -
 .../exec/work/batch/SpoolingRawBatchBuffer.java | 450 +++++++++++++------
 .../work/batch/UnlimitedRawBatchBuffer.java     | 200 ++-------
 .../src/main/resources/drill-module.conf        |   3 +-
 .../exec/work/batch/TestSpoolingBuffer.java     |   4 +-
 .../work/batch/TestUnlimitedBatchBuffer.java    | 166 -------
 23 files changed, 672 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 7f80f7a..6f61f30 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -755,7 +755,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     release();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
index f01d025..6bb0760 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
@@ -33,15 +33,17 @@ public abstract class AbstractReceiver extends AbstractBase implements Receiver{
 
   private final int oppositeMajorFragmentId;
   private final List<MinorFragmentEndpoint> senders;
+  private final boolean spooling;
 
   /**
    * @param oppositeMajorFragmentId MajorFragmentId of fragments that are sending data to this receiver.
    * @param senders List of sender MinorFragmentEndpoints each containing sender MinorFragmentId and Drillbit endpoint
    *                where it is running.
    */
-  public AbstractReceiver(int oppositeMajorFragmentId, List<MinorFragmentEndpoint> senders){
+  public AbstractReceiver(int oppositeMajorFragmentId, List<MinorFragmentEndpoint> senders, boolean spooling){
     this.oppositeMajorFragmentId = oppositeMajorFragmentId;
     this.senders = ImmutableList.copyOf(senders);
+    this.spooling = spooling;
   }
 
   @Override
@@ -75,5 +77,10 @@ public abstract class AbstractReceiver extends AbstractBase implements Receiver{
   public int getNumSenders() {
     return senders.size();
   }
+
+  @Override
+  public boolean isSpooling() {
+    return spooling;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
index 04d6d7e..4b34205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
@@ -49,4 +49,7 @@ public interface Receiver extends FragmentLeaf {
 
   @JsonProperty("sender-major-fragment")
   public int getOppositeMajorFragmentId();
+
+  @JsonProperty("spooling")
+  public boolean isSpooling();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index a37f638..89bc343 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -49,6 +49,6 @@ public class BroadcastExchange extends AbstractExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
index f45ace9..f004118 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -55,7 +55,7 @@ public class HashToMergeExchange extends AbstractExchange{
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs);
+    return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index 52d79c2..fb2f9d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -34,6 +34,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class HashToRandomExchange extends AbstractExchange{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class);
 
+  private static final boolean HASH_EXCHANGE_SPOOLING;
+
+  static {
+    HASH_EXCHANGE_SPOOLING = "true".equals(System.getProperty("drill.hash_exchange_spooling", "false"));
+  }
+
   private final LogicalExpression expr;
 
   @JsonCreator
@@ -50,7 +56,7 @@ public class HashToRandomExchange extends AbstractExchange{
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), HASH_EXCHANGE_SPOOLING);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 9416814..a6bab64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -42,8 +42,9 @@ public class MergingReceiverPOP extends AbstractReceiver{
   @JsonCreator
   public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
                             @JsonProperty("senders") List<MinorFragmentEndpoint> senders,
-                            @JsonProperty("orderings") List<Ordering> orderings) {
-    super(oppositeMajorFragmentId, senders);
+                            @JsonProperty("orderings") List<Ordering> orderings,
+                            @JsonProperty("spooling") boolean spooling) {
+    super(oppositeMajorFragmentId, senders, spooling);
     this.orderings = orderings;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index c8dbc22..2463bc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -73,7 +73,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index c812325..5da3900 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -72,7 +72,7 @@ public class SingleMergeExchange extends AbstractExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr);
+    return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr, false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index b7b7835..318e6b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -69,7 +69,7 @@ public class UnionExchange extends AbstractExchange{
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+    return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
index 0bc6678..700a21b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
@@ -46,7 +46,7 @@ public class UnorderedDeMuxExchange extends AbstractDeMuxExchange {
       throw new IllegalStateException(String.format("Failed to find sender for receiver [%d]", minorFragmentId));
     }
 
-    return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender));
+    return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender), false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
index 3028ee3..46f1fd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
@@ -45,7 +45,7 @@ public class UnorderedMuxExchange extends AbstractMuxExchange {
       throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
     }
 
-    return new UnorderedReceiver(senderMajorFragmentId, senders);
+    return new UnorderedReceiver(senderMajorFragmentId, senders, false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index e741dd4..77d718e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -34,8 +34,9 @@ public class UnorderedReceiver extends AbstractReceiver{
 
   @JsonCreator
   public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
-                           @JsonProperty("senders") List<MinorFragmentEndpoint> senders) {
-    super(oppositeMajorFragmentId, senders);
+                           @JsonProperty("senders") List<MinorFragmentEndpoint> senders,
+                           @JsonProperty("spooling") boolean spooling) {
+    super(oppositeMajorFragmentId, senders, spooling);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index edd79ac..f2f9450 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -19,15 +19,19 @@ package org.apache.drill.exec.record;
 
 import io.netty.buffer.DrillBuf;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.rpc.data.AckSender;
 
 public class RawFragmentBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
 
-  final FragmentRecordBatch header;
-  final DrillBuf body;
-  final AckSender sender;
+  private final FragmentRecordBatch header;
+  private final DrillBuf body;
+  private final AckSender sender;
+
+  private AtomicBoolean ackSent = new AtomicBoolean(false);
 
   public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
     super();
@@ -63,12 +67,18 @@ public class RawFragmentBatch {
     return sender;
   }
 
-  public void sendOk() {
-    sender.sendOk();
+  public synchronized void sendOk() {
+    if (sender != null && ackSent.compareAndSet(false, true)) {
+      sender.sendOk();
+    }
   }
 
   public long getByteCount() {
     return body == null ? 0 : body.readableBytes();
   }
 
+  public boolean isAckSent() {
+    return ackSent.get();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
index b88cc28..58db550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -26,9 +26,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.RandomAccess;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -120,7 +122,8 @@ public class LocalSyncableFileSystem extends FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path path) throws IOException {
-    return null;
+    File file = new File(Path.getPathWithoutSchemeAndAuthority(path).toString());
+    return new FileStatus(file.length(), file.isDirectory(), 1, 0, file.lastModified(), path);
   }
 
   public class LocalSyncableOutputStream extends OutputStream implements Syncable {
@@ -166,9 +169,12 @@ public class LocalSyncableFileSystem extends FileSystem {
   public class LocalInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
 
     private BufferedInputStream input;
+    private String path;
+    private long position = 0;
 
     public LocalInputStream(Path path)  throws IOException {
-      input = new BufferedInputStream(new FileInputStream(path.toString()), 1024*1024);
+      this.path = path.toString();
+      input = new BufferedInputStream(new FileInputStream(new RandomAccessFile(this.path, "r").getFD()), 1024*1024);
     }
 
     @Override
@@ -188,13 +194,16 @@ public class LocalSyncableFileSystem extends FileSystem {
 
     @Override
     public void seek(long l) throws IOException {
-      input.reset();
-      input.skip(l);
+      input.close();
+      RandomAccessFile raf = new RandomAccessFile(path, "r");
+      raf.seek(l);
+      input = new BufferedInputStream(new FileInputStream(raf.getFD()), 1024*1024);
+      position = l;
     }
 
     @Override
     public long getPos() throws IOException {
-      throw new IOException("getPos not supported");
+      return position;
     }
 
     @Override
@@ -236,6 +245,7 @@ public class LocalSyncableFileSystem extends FileSystem {
     public int read() throws IOException {
       byte[] b = new byte[1];
       input.read(b);
+      position++;
       return (int) b[0] & 0xFF;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index ed16314..6f16976 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.Receiver;
@@ -73,15 +74,19 @@ public abstract class AbstractDataCollector implements DataCollector{
     buffers = new RawBatchBuffer[numBuffers];
     remainingRequired = new AtomicInteger(numBuffers);
 
+    final boolean spooling = receiver.isSpooling();
+
     try {
-      String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
-      Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, int.class);
 
-      for(int i=0; i<numBuffers; i++) {
-        buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, bufferCapacity);
+      for (int i = 0; i < numBuffers; i++) {
+        if (spooling) {
+          buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId(), i);
+        } else {
+          buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId());
+        }
       }
-    } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
-            NoSuchMethodException | ClassNotFoundException e) {
+    } catch (IOException | OutOfMemoryException e) {
+      logger.error("Exception", e);
       context.fail(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
new file mode 100644
index 0000000..5192e46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+
+public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
+
+  private static enum BufferState {
+    INIT,
+    STREAMS_FINISHED,
+    KILLED
+  }
+
+  protected interface BufferQueue<T> {
+    public void addOomBatch(RawFragmentBatch batch);
+    public RawFragmentBatch poll() throws IOException;
+    public RawFragmentBatch take() throws IOException, InterruptedException;
+    public boolean checkForOutOfMemory();
+    public int size();
+    public boolean isEmpty();
+    public void add(T obj);
+  }
+
+  protected BufferQueue<T> bufferQueue;
+  private volatile BufferState state = BufferState.INIT;
+  protected final int bufferSizePerSocket;
+  protected final AtomicBoolean outOfMemory = new AtomicBoolean(false);
+  private int streamCounter;
+  private final int fragmentCount;
+  protected final FragmentContext context;
+
+  public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
+    bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+
+    this.fragmentCount = fragmentCount;
+    this.streamCounter = fragmentCount;
+    this.context = context;
+  }
+
+  @Override
+  public void enqueue(final RawFragmentBatch batch) throws IOException {
+
+    // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
+    // ensure that tests run.
+    if (context != null && !context.shouldContinue()) {
+      this.kill(context);
+    }
+
+    if (isTerminated()) {
+      if (state == BufferState.KILLED) {
+        // do not even enqueue just release and send ack back
+        batch.release();
+        batch.sendOk();
+        return;
+      } else {
+        throw new IOException("Attempted to enqueue batch after finished");
+      }
+    }
+    if (batch.getHeader().getIsOutOfMemory()) {
+      handleOutOfMemory(batch);
+      return;
+    }
+    enqueueInner(batch);
+  }
+
+  /**
+   * handle the out of memory case
+   *
+   * @param batch
+   */
+  protected void handleOutOfMemory(final RawFragmentBatch batch) {
+    if (!bufferQueue.checkForOutOfMemory()) {
+      logger.debug("Adding OOM message to front of queue. Current queue size: {}", bufferQueue.size());
+      bufferQueue.addOomBatch(batch);
+    } else {
+      logger.debug("ignoring duplicate OOM message");
+    }
+  }
+
+  /**
+   * implementation specific method to enqueue batch
+   *
+   * @param batch
+   * @throws IOException
+   */
+  protected abstract void enqueueInner(final RawFragmentBatch batch) throws IOException;
+
+//  ## Add assertion that all acks have been sent. TODO
+  @Override
+  public void cleanup() {
+    if (!isTerminated() && context.shouldContinue()) {
+      final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+      final IllegalStateException e = new IllegalStateException(msg);
+      throw e;
+    }
+
+    if (!bufferQueue.isEmpty()) {
+      if (context.shouldContinue()) {
+        context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+        logger.error("{} Batches in queue.", bufferQueue.size());
+      }
+      clearBufferWithBody();
+    }
+  }
+
+  @Override
+  public void kill(final FragmentContext context) {
+    state = BufferState.KILLED;
+    clearBufferWithBody();
+  }
+
+  /**
+   * Helper method to clear buffer with request bodies release also flushes ack queue - in case there are still
+   * responses pending
+   */
+  private void clearBufferWithBody() {
+    while (!bufferQueue.isEmpty()) {
+      final RawFragmentBatch batch;
+      try {
+        batch = bufferQueue.poll();
+        assertAckSent(batch);
+      } catch (IOException e) {
+        context.fail(e);
+        continue;
+      }
+      if (batch.getBody() != null) {
+        batch.getBody().release();
+      }
+    }
+  }
+
+  private void allStreamsFinished() {
+    if (state != BufferState.KILLED) {
+      state = BufferState.STREAMS_FINISHED;
+    }
+
+    if (!bufferQueue.isEmpty()) {
+      throw new IllegalStateException("buffer not empty when finished");
+    }
+  }
+
+  @Override
+  public RawFragmentBatch getNext() throws IOException {
+
+    if (outOfMemory.get()) {
+      if (bufferQueue.size() < 10) {
+        outOfMemory.set(false);
+      }
+    }
+
+    RawFragmentBatch b;
+    try {
+      b = bufferQueue.poll();
+
+      // if we didn't get a batch, block on waiting for queue.
+      if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
+        b = bufferQueue.take();
+      }
+    } catch (final InterruptedException e) {
+
+      // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
+      if (!context.shouldContinue()) {
+        kill(context);
+      } else {
+        throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
+      }
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
+    }
+
+    if (b != null) {
+      if (b.getHeader().getIsOutOfMemory()) {
+        outOfMemory.set(true);
+        return b;
+      }
+
+      upkeep(b);
+
+      if (b.getHeader().getIsLastBatch()) {
+        logger.debug("Got last batch from {}:{}", b.getHeader().getSendingMajorFragmentId(), b.getHeader()
+            .getSendingMinorFragmentId());
+        final int remainingStreams = decrementStreamCounter();
+        if (remainingStreams == 0) {
+          logger.debug("Streams finished");
+          allStreamsFinished();
+        }
+      }
+    } else {
+      if (!bufferQueue.isEmpty()) {
+        throw new IllegalStateException("Returning null when there are batches left in queue");
+      }
+      if (!isTerminated()) {
+        throw new IllegalStateException("Returning null when not finished");
+      }
+    }
+
+    assertAckSent(b);
+    return b;
+
+  }
+
+  private void assertAckSent(RawFragmentBatch batch) {
+    assert batch == null || batch.isAckSent() || batch.getHeader().getIsOutOfMemory() : "Ack not sent for batch";
+  }
+
+  private int decrementStreamCounter() {
+    streamCounter--;
+    return streamCounter;
+  }
+
+  private int completedStreams() {
+    return fragmentCount - streamCounter;
+  }
+
+  /**
+   * Handle miscellaneous tasks after batch retrieval
+   */
+  protected abstract void upkeep(RawFragmentBatch batch);
+
+  protected boolean isTerminated() {
+    return (state == BufferState.KILLED || state == BufferState.STREAMS_FINISHED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 8646a72..0441eca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -38,9 +38,4 @@ public interface RawBatchBuffer extends RawFragmentBatchProvider {
    * @returns Whether response should be returned.
    */
   public void enqueue(RawFragmentBatch batch) throws IOException;
-
-  /**
-   * Inform the buffer that no more batches are expected.
-   */
-  public void finished();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 07a3505..1634982 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -20,31 +20,32 @@ package org.apache.drill.exec.work.batch;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
-import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Queues;
@@ -53,7 +54,7 @@ import com.google.common.collect.Queues;
  * This implementation of RawBatchBuffer starts writing incoming batches to disk once the buffer size reaches a threshold.
  * The order of the incoming buffers is maintained.
  */
-public class SpoolingRawBatchBuffer implements RawBatchBuffer {
+public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchBuffer.RawFragmentBatchWrapper> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
 
   private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
@@ -61,183 +62,305 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
-  private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
-  private volatile boolean finished = false;
-  private volatile long queueSize = 0;
-  private long threshold;
-  private FragmentContext context;
-  private BufferAllocator allocator;
-  private volatile AtomicBoolean spooling = new AtomicBoolean(false);
+  private enum SpoolingState {
+    NOT_SPOOLING,
+    SPOOLING,
+    PAUSE_SPOOLING,
+    STOP_SPOOLING
+  }
+
+  private final BufferAllocator allocator;
+  private final long threshold;
+  private final int oppositeId;
+  private final int bufferIndex;
+
+  private volatile SpoolingState spoolingState;
+  private volatile long currentSizeInMemory = 0;
+  private volatile Spooler spooler;
+
   private FileSystem fs;
   private Path path;
   private FSDataOutputStream outputStream;
-  private FSDataInputStream inputStream;
-  private boolean outOfMemory = false;
-  private boolean closed = false;
-  private FragmentManager fragmentManager;
 
-  public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount) throws IOException, OutOfMemoryException {
-    this.context = context;
+  public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) throws IOException, OutOfMemoryException {
+    super(context, fragmentCount);
     this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true);
     this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
-    conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
-    this.fs = FileSystem.get(conf);
-    this.path = new Path(getDir(), getFileName());
+    this.oppositeId = oppositeId;
+    this.bufferIndex = bufferIndex;
+    this.bufferQueue = new SpoolingBufferQueue();
   }
 
-  public static List<String> DIRS = DrillConfig.create().getStringList(ExecConstants.TEMP_DIRECTORIES);
+  private class SpoolingBufferQueue implements BufferQueue<RawFragmentBatchWrapper> {
 
-  public static String getDir() {
-    Random random = new Random();
-    return DIRS.get(random.nextInt(DIRS.size()));
-  }
+    private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
 
-  @Override
-  public synchronized void enqueue(RawFragmentBatch batch) throws IOException {
-    if (batch.getHeader().getIsOutOfMemory()) {
-      if (fragmentManager == null) {
-        throw new UnsupportedOperationException("Need to fix.");
-//        fragmentManager = ((BitServerConnection) batch.getConnection()).getFragmentManager();
-      }
-//      fragmentManager.setAutoRead(false);
-//      logger.debug("Setting autoRead false");
-      if (!outOfMemory && !buffer.peekFirst().isOutOfMemory()) {
-        logger.debug("Adding OOM message to front of queue. Current queue size: {}", buffer.size());
-        buffer.addFirst(new RawFragmentBatchWrapper(batch, true));
-      } else {
-        logger.debug("ignoring duplicate OOM message");
-      }
-      batch.sendOk();
-      return;
+    @Override
+    public void addOomBatch(RawFragmentBatch batch) {
+      RawFragmentBatchWrapper batchWrapper = new RawFragmentBatchWrapper(batch, true);
+      batchWrapper.setOutOfMemory(true);
+      buffer.addFirst(batchWrapper);
     }
-    RawFragmentBatchWrapper wrapper;
-    boolean spool = spooling.get();
-    wrapper = new RawFragmentBatchWrapper(batch, !spool);
-    queueSize += wrapper.getBodySize();
-    if (spool) {
-      if (outputStream == null) {
-        outputStream = fs.create(path);
+
+    @Override
+    public RawFragmentBatch poll() throws IOException {
+      RawFragmentBatchWrapper batchWrapper = buffer.poll();
+      if (batchWrapper != null) {
+        try {
+          return batchWrapper.get();
+        } catch (InterruptedException e) {
+          return null;
+        }
       }
-      wrapper.writeToStream(outputStream);
+      return null;
+    }
+
+    @Override
+    public RawFragmentBatch take() throws IOException, InterruptedException {
+      return buffer.take().get();
+    }
+
+    @Override
+    public boolean checkForOutOfMemory() {
+      return buffer.peek().isOutOfMemory();
+    }
+
+    @Override
+    public int size() {
+      return buffer.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return buffer.size() == 0;
     }
-    buffer.add(wrapper);
-    if (!spool && queueSize > threshold) {
-      logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", queueSize, threshold);
-      spooling.set(true);
+
+    public void add(RawFragmentBatchWrapper batchWrapper) {
+      buffer.add(batchWrapper);
     }
   }
 
-  @Override
-  public void kill(FragmentContext context) {
-    allocator.close();
+  private synchronized void setSpoolingState(SpoolingState newState) {
+    SpoolingState currentState = spoolingState;
+    if (newState == SpoolingState.NOT_SPOOLING ||
+        currentState == SpoolingState.STOP_SPOOLING) {
+      return;
+    }
+    spoolingState = newState;
   }
 
+  private boolean isCurrentlySpooling() {
+    return spoolingState == SpoolingState.SPOOLING;
+  }
 
-  @Override
-  public void finished() {
-    finished = true;
+  private void startSpooling() {
+    setSpoolingState(SpoolingState.SPOOLING);
   }
 
-  @Override
-  public RawFragmentBatch getNext() throws IOException, InterruptedException {
-    if (outOfMemory && buffer.size() < 10) {
-      outOfMemory = false;
-      fragmentManager.setAutoRead(true);
-      logger.debug("Setting autoRead true");
+  private void pauseSpooling() {
+    setSpoolingState(SpoolingState.PAUSE_SPOOLING);
+  }
+
+  private boolean isSpoolingStopped() {
+    return spoolingState == SpoolingState.STOP_SPOOLING;
+  }
+
+  private void stopSpooling() {
+    setSpoolingState(SpoolingState.STOP_SPOOLING);
+  }
+
+  public String getDir() {
+    List<String> dirs = context.getConfig().getStringList(ExecConstants.TEMP_DIRECTORIES);
+    return dirs.get(ThreadLocalRandom.current().nextInt(dirs.size()));
+  }
+
+  private synchronized void initSpooler() throws IOException {
+    if (spooler != null) {
+      return;
     }
-    boolean spool = spooling.get();
-    RawFragmentBatchWrapper w = buffer.poll();
-    RawFragmentBatch batch;
-    if(w == null && !finished){
-      try {
-        w = buffer.take();
-        batch = w.get();
-        if (batch.getHeader().getIsOutOfMemory()) {
-          outOfMemory = true;
-          return batch;
-        }
-        queueSize -= w.getBodySize();
-        return batch;
-      } catch (final InterruptedException e) {
-        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-        // interruption and respond to it if it wants to.
-        Thread.currentThread().interrupt();
 
-        return null;
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
+    conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
+    fs = FileSystem.get(conf);
+    path = getPath();
+    outputStream = fs.create(path);
+    final String spoolingThreadName = QueryIdHelper.getExecutorThreadName(context.getHandle()).concat(
+        ":Spooler-" + oppositeId + "-" + bufferIndex);
+    spooler = new Spooler(spoolingThreadName);
+    spooler.start();
+  }
+
+  @Override
+  protected void enqueueInner(RawFragmentBatch batch) throws IOException {
+    assert batch.getHeader().getSendingMajorFragmentId() == oppositeId;
+
+    logger.debug("Enqueue batch. Current buffer size: {}. Last batch: {}. Sending fragment: {}", bufferQueue.size(), batch.getHeader().getIsLastBatch(), batch.getHeader().getSendingMajorFragmentId());
+    RawFragmentBatchWrapper wrapper;
+
+    boolean spoolCurrentBatch = isCurrentlySpooling();
+    wrapper = new RawFragmentBatchWrapper(batch, !spoolCurrentBatch);
+    currentSizeInMemory += wrapper.getBodySize();
+    if (spoolCurrentBatch) {
+      if (spooler == null) {
+        initSpooler();
       }
+      spooler.addBatchForSpooling(wrapper);
     }
-    if (w == null) {
-      return null;
+    bufferQueue.add(wrapper);
+    if (!spoolCurrentBatch && currentSizeInMemory > threshold) {
+      logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", currentSizeInMemory, threshold);
+      startSpooling();
     }
+  }
 
-    batch = w.get();
-    if (batch.getHeader().getIsOutOfMemory()) {
-      outOfMemory = true;
-      return batch;
+  @Override
+  public void kill(FragmentContext context) {
+    allocator.close();
+    if (spooler != null) {
+      spooler.terminate();
+    }
+  }
+
+  @Override
+  protected void upkeep(RawFragmentBatch batch) {
+    FragmentRecordBatch header = batch.getHeader();
+    if (header.getIsOutOfMemory()) {
+      outOfMemory.set(true);
+      return;
+    }
+    DrillBuf body = batch.getBody();
+    if (body != null) {
+      currentSizeInMemory -= body.capacity();
     }
-    queueSize -= w.getBodySize();
-//    assert queueSize >= 0;
-    if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
-      logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
-      spooling.set(false);
+    if (isCurrentlySpooling() && currentSizeInMemory < threshold * STOP_SPOOLING_FRACTION) {
+      logger.debug("buffer size {} less than {}x threshold. Stop spooling.", currentSizeInMemory, STOP_SPOOLING_FRACTION);
+      pauseSpooling();
     }
-    return batch;
+    logger.debug("Got batch. Current buffer size: {}", bufferQueue.size());
   }
 
   public void cleanup() {
-    if (closed) {
-      logger.warn("Tried cleanup twice");
-      return;
+    if (spooler != null) {
+      spooler.terminate();
+      while (spooler.isAlive()) {
+        try {
+          spooler.join();
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted while waiting for spooling thread to exit");
+          continue;
+        }
+      }
     }
-    closed = true;
     allocator.close();
     try {
       if (outputStream != null) {
         outputStream.close();
       }
-      if (inputStream != null) {
-        inputStream.close();
-      }
     } catch (IOException e) {
       logger.warn("Failed to cleanup I/O streams", e);
     }
     if (context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
       try {
-        fs.delete(path,false);
+        if (fs != null) {
+          fs.delete(path, false);
+          logger.debug("Deleted file {}", path.toString());
+        }
       } catch (IOException e) {
         logger.warn("Failed to delete temporary files", e);
       }
-      logger.debug("Deleted file {}", path.toString());
     }
+    super.cleanup();
   }
 
-  private class RawFragmentBatchWrapper {
+  private class Spooler extends Thread {
+
+    private final LinkedBlockingDeque<RawFragmentBatchWrapper> spoolingQueue;
+    private volatile boolean shouldContinue = true;
+    private Thread spoolingThread;
+
+    public Spooler(String name) {
+      setDaemon(true);
+      setName(name);
+      spoolingQueue = Queues.newLinkedBlockingDeque();
+    }
+
+    public void run() {
+      try {
+        while (shouldContinue) {
+          RawFragmentBatchWrapper batch;
+          try {
+            batch = spoolingQueue.take();
+          } catch (InterruptedException e) {
+            if (shouldContinue) {
+              continue;
+            } else {
+              break;
+            }
+          }
+          try {
+            batch.writeToStream(outputStream);
+          } catch (IOException e) {
+            context.fail(e);
+          }
+        }
+      } catch (Throwable e) {
+        context.fail(e);
+      } finally {
+        logger.info("Spooler thread exiting");
+      }
+    }
+
+    public void addBatchForSpooling(RawFragmentBatchWrapper batchWrapper) {
+      if (isSpoolingStopped()) {
+        spoolingQueue.add(batchWrapper);
+      } else {
+        // will not spill this batch
+        batchWrapper.available = true;
+        batchWrapper.batch.sendOk();
+        batchWrapper.latch.countDown();
+      }
+    }
+
+    public void terminate() {
+      stopSpooling();
+      shouldContinue = false;
+      if (spoolingThread.isAlive()) {
+        spoolingThread.interrupt();
+      }
+    }
+  }
+
+  class RawFragmentBatchWrapper {
     private RawFragmentBatch batch;
-    private boolean available;
-    private CountDownLatch latch = new CountDownLatch(1);
-    private int bodyLength;
-    private boolean outOfMemory = false;
+    private volatile boolean available;
+    private CountDownLatch latch;
+    private volatile int bodyLength;
+    private volatile boolean outOfMemory = false;
+    private long start = -1;
+    private long check;
 
     public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
       Preconditions.checkNotNull(batch);
       this.batch = batch;
       this.available = available;
+      this.latch = new CountDownLatch(available ? 0 : 1);
+      if (available) {
+        batch.sendOk();
+      }
     }
 
     public boolean isNull() {
       return batch == null;
     }
 
-    public RawFragmentBatch get() throws IOException {
+    public RawFragmentBatch get() throws InterruptedException, IOException {
       if (available) {
+        assert batch.getHeader() != null : "batch header null";
         return batch;
       } else {
-        if (inputStream == null) {
-          inputStream = fs.open(path);
-        }
-        readFromStream(inputStream);
+        latch.await();
+        readFromStream();
         available = true;
         return batch;
       }
@@ -255,32 +378,81 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
       Stopwatch watch = new Stopwatch();
       watch.start();
       available = false;
+      check = ThreadLocalRandom.current().nextLong();
+      start = stream.getPos();
+      logger.debug("Writing check value {} at position {}", check, start);
+      stream.writeLong(check);
       batch.getHeader().writeDelimitedTo(stream);
       ByteBuf buf = batch.getBody();
-      if (buf == null) {
+      if (buf != null) {
+        bodyLength = buf.capacity();
+      } else {
         bodyLength = 0;
-        return;
       }
-      bodyLength = buf.readableBytes();
-      buf.getBytes(0, stream, bodyLength);
-      stream.sync();
+      if (bodyLength > 0) {
+        buf.getBytes(0, stream, bodyLength);
+      }
+      stream.hsync();
+      FileStatus status = fs.getFileStatus(path);
+      long len = status.getLen();
+      logger.debug("After spooling batch, stream at position {}. File length {}", stream.getPos(), len);
+      batch.sendOk();
+      latch.countDown();
       long t = watch.elapsed(TimeUnit.MICROSECONDS);
       logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
-      buf.release();
+      if (buf != null) {
+        buf.release();
+      }
     }
 
-    public void readFromStream(FSDataInputStream stream) throws IOException {
-      Stopwatch watch = new Stopwatch();
-      watch.start();
-      BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
-      DrillBuf buf = allocator.buffer(bodyLength);
-      buf.writeBytes(stream, bodyLength);
-      batch = new RawFragmentBatch(header, buf, null);
-      buf.release();
-      available = true;
-      latch.countDown();
-      long t = watch.elapsed(TimeUnit.MICROSECONDS);
-      logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+    public void readFromStream() throws IOException, InterruptedException {
+      long pos = start;
+      boolean tryAgain = true;
+      int duration = 0;
+
+      while (tryAgain) {
+
+        // Sometimes, the file isn't quite done writing when we attempt to read it. As such, we need to wait and retry.
+        Thread.sleep(duration);
+
+        try(final FSDataInputStream stream = fs.open(path);
+            final DrillBuf buf = allocator.buffer(bodyLength)) {
+          stream.seek(start);
+          final long currentPos = stream.getPos();
+          final long check = stream.readLong();
+          pos = stream.getPos();
+          assert check == this.check : String.format("Check values don't match: %d %d, Position %d", this.check, check, currentPos);
+          Stopwatch watch = new Stopwatch();
+          watch.start();
+          BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
+          pos = stream.getPos();
+          assert header != null : "header null after parsing from stream";
+          buf.writeBytes(stream, bodyLength);
+          pos = stream.getPos();
+          batch = new RawFragmentBatch(header, buf, null);
+          available = true;
+          latch.countDown();
+          long t = watch.elapsed(TimeUnit.MICROSECONDS);
+          logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+          tryAgain = false;
+        } catch (EOFException e) {
+          FileStatus status = fs.getFileStatus(path);
+          logger.warn("EOF reading from file {} at pos {}. Current file size: {}", path, pos, status.getLen());
+          duration = Math.max(1, duration * 2);
+          if (duration < 60000) {
+            continue;
+          } else {
+            throw e;
+          }
+        } finally {
+          if (tryAgain) {
+            // we had a premature exit, release batch memory so we don't leak it.
+            if (batch != null) {
+              batch.getBody().release();
+            }
+          }
+        }
+      }
     }
 
     private boolean isOutOfMemory() {
@@ -292,7 +464,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
     }
   }
 
-  private String getFileName() {
+  private Path getPath() {
     ExecProtos.FragmentHandle handle = context.getHandle();
 
     String qid = QueryIdHelper.getQueryId(handle.getQueryId());
@@ -300,8 +472,8 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
     int majorFragmentId = handle.getMajorFragmentId();
     int minorFragmentId = handle.getMinorFragmentId();
 
-    String fileName = String.format("%s_%s_%s", qid, majorFragmentId, minorFragmentId);
+    String fileName = Joiner.on(Path.SEPARATOR).join(getDir(), qid, majorFragmentId, minorFragmentId, oppositeId, bufferIndex);
 
-    return fileName;
+    return new Path(fileName);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 4750666..ef06ea8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,206 +19,78 @@ package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
-public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
+public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
 
-  private static enum BufferState {
-    INIT,
-    FINISHED,
-    KILLED
-  }
-
-  private final LinkedBlockingDeque<RawFragmentBatch> buffer;
-  private volatile BufferState state = BufferState.INIT;
   private final int softlimit;
   private final int startlimit;
-  private final int bufferSizePerSocket;
-  private final AtomicBoolean overlimit = new AtomicBoolean(false);
-  private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
-  private final ResponseSenderQueue readController = new ResponseSenderQueue();
-  private int streamCounter;
-  private final int fragmentCount;
-  private final FragmentContext context;
-
-  public UnlimitedRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
-    bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
 
+  public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId) {
+    super(context, fragmentCount);
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
     logger.trace("softLimit: {}, startLimit: {}", softlimit, startlimit);
-    this.buffer = Queues.newLinkedBlockingDeque();
-    this.fragmentCount = fragmentCount;
-    this.streamCounter = fragmentCount;
-    this.context = context;
+    this.bufferQueue = new UnlimitedBufferQueue();
   }
 
-  @Override
-  public void enqueue(final RawFragmentBatch batch) throws IOException {
+  private class UnlimitedBufferQueue implements BufferQueue<RawFragmentBatch> {
+    private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();;
 
-    // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
-    // ensure that tests run.
-    if (context != null && !context.shouldContinue()) {
-      this.kill(context);
+    @Override
+    public void addOomBatch(RawFragmentBatch batch) {
+      buffer.addFirst(batch);
     }
 
-    if (isFinished()) {
-      if (state == BufferState.KILLED) {
-        // do not even enqueue just release and send ack back
-        batch.release();
+    @Override
+    public RawFragmentBatch poll() throws IOException {
+      RawFragmentBatch batch = buffer.poll();
+      if (batch != null) {
         batch.sendOk();
-        return;
-      } else {
-        throw new IOException("Attempted to enqueue batch after finished");
-      }
-    }
-    if (batch.getHeader().getIsOutOfMemory()) {
-      logger.trace("Setting autoread false");
-      final RawFragmentBatch firstBatch = buffer.peekFirst();
-      final FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
-      if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
-        buffer.addFirst(batch);
       }
-      outOfMemory.set(true);
-      return;
+      return batch;
     }
-    buffer.add(batch);
-    if (buffer.size() >= softlimit) {
-      logger.trace("buffer.size: {}", buffer.size());
-      overlimit.set(true);
-      readController.enqueueResponse(batch.getSender());
-    } else {
+
+    @Override
+    public RawFragmentBatch take() throws IOException, InterruptedException {
+      RawFragmentBatch batch = buffer.take();
       batch.sendOk();
+      return batch;
     }
-  }
 
-  @Override
-  public void cleanup() {
-    if (!isFinished() && context.shouldContinue()) {
-      final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
-      final IllegalStateException e = new IllegalStateException(msg);
-      throw e;
+    @Override
+    public boolean checkForOutOfMemory() {
+      return buffer.peekFirst().getHeader().getIsOutOfMemory();
     }
 
-    if (!buffer.isEmpty()) {
-      if (context.shouldContinue()) {
-        context.fail(new IllegalStateException("Batches still in queue during cleanup"));
-        logger.error("{} Batches in queue.", buffer.size());
-      }
-      clearBufferWithBody();
+    @Override
+    public int size() {
+      return buffer.size();
     }
-  }
 
-  @Override
-  public void kill(final FragmentContext context) {
-    state = BufferState.KILLED;
-    clearBufferWithBody();
-  }
-
-  /**
-   * Helper method to clear buffer with request bodies release
-   * also flushes ack queue - in case there are still responses pending
-   */
-  private void clearBufferWithBody() {
-    while (!buffer.isEmpty()) {
-      final RawFragmentBatch batch = buffer.poll();
-      if (batch.getBody() != null) {
-        batch.getBody().release();
-      }
+    @Override
+    public boolean isEmpty() {
+      return buffer.size() == 0;
     }
-    readController.flushResponses();
-  }
 
-  @Override
-  public void finished() {
-    if (state != BufferState.KILLED) {
-      state = BufferState.FINISHED;
-    }
-    if (!buffer.isEmpty()) {
-      throw new IllegalStateException("buffer not empty when finished");
+    @Override
+    public void add(RawFragmentBatch batch) {
+      buffer.add(batch);
     }
   }
 
-  @Override
-  public RawFragmentBatch getNext() throws IOException, InterruptedException {
-
-    if (outOfMemory.get() && buffer.size() < 10) {
-      logger.trace("Setting autoread true");
-      outOfMemory.set(false);
-      readController.flushResponses();
-    }
-
-    RawFragmentBatch b = null;
-
-    b = buffer.poll();
-
-    // if we didn't get a buffer, block on waiting for buffer.
-    if (b == null && (!isFinished() || !buffer.isEmpty())) {
-      try {
-        b = buffer.take();
-      } catch (final InterruptedException e) {
-        logger.debug("Interrupted while waiting for incoming data.", e);
-        throw e;
-      }
-    }
-
-    if (b != null && b.getHeader().getIsOutOfMemory()) {
-      outOfMemory.set(true);
-      return b;
-    }
-
-
-    // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
-    // when queue size is lower then softlimit - the bigger the difference the more we can flush
-    if (!isFinished() && overlimit.get()) {
-      final int flushCount = softlimit - buffer.size();
-      if ( flushCount > 0 ) {
-        final int flushed = readController.flushResponses(flushCount);
-        logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
-        if ( flushed == 0 ) {
-          // queue is empty - nothing to do for now
-          overlimit.set(false);
-        }
-      }
-    }
-
-    if (b != null && b.getHeader().getIsLastBatch()) {
-      streamCounter--;
-      if (streamCounter == 0) {
-        finished();
-      }
-    }
-
-    if (b == null && buffer.size() > 0) {
-      throw new IllegalStateException("Returning null when there are batches left in queue");
-    }
-    if (b == null && !isFinished()) {
-      throw new IllegalStateException("Returning null when not finished");
+  protected void enqueueInner(final RawFragmentBatch batch) throws IOException {
+    if (bufferQueue.size() < softlimit) {
+      batch.sendOk();
     }
-    return b;
-
-  }
-
-  private boolean isFinished() {
-    return (state == BufferState.KILLED || state == BufferState.FINISHED);
-  }
-
-  @VisibleForTesting
-  ResponseSenderQueue getReadController() {
-    return readController;
+    bufferQueue.add(batch);
   }
 
-  @VisibleForTesting
-  boolean isBufferEmpty() {
-    return buffer.isEmpty();
+  protected void upkeep(RawFragmentBatch batch) {
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7630938..6fb9340 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -122,10 +122,9 @@ drill.exec: {
     filesystem: "drill-local:///"
   },
   buffer:{
-    impl: "org.apache.drill.exec.work.batch.UnlimitedRawBatchBuffer",
     size: "6",
     spooling: {
-      delete: false,
+      delete: true,
       size: 100000000
     }
   },

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
index dcea9bb..271f29f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
+import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecTest;
@@ -33,7 +34,7 @@ import org.junit.Test;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-public class TestSpoolingBuffer extends ExecTest {
+public class TestSpoolingBuffer extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpoolingBuffer.class);
 
   @Test
@@ -59,5 +60,4 @@ public class TestSpoolingBuffer extends ExecTest {
       assertEquals(500024, count);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
deleted file mode 100644
index b8336e9..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.data.AckSender;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test case to test whether backpressure is applied when
- * size of the queue of RawBatchBuffers is exceeding specified softLimit.
- * It is testing that acknowledgments are queued and sent according to the
- * correct schedule
- * If algorithm to release acks will be changed in the future
- * this test will need to be changed
- * It is not testing whether Senders receive acknowledgments and act accordingly
- */
-public class TestUnlimitedBatchBuffer extends ExecTest {
-
-  private static int FRAGMENT_COUNT = 5;
-  private DrillConfig dc = DrillConfig.create();
-  private MyAckSender myAckSender;
-  private UnlimitedRawBatchBuffer rawBuffer;
-  private RawFragmentBatch batch;
-  private FragmentContext context;
-  private int softLimit;
-
-  private static class MyAckSender extends AckSender {
-
-    private int sendCount = 0;
-
-    public MyAckSender() {
-      super(null);
-    }
-
-    @Override
-    public void sendOk() {
-      sendCount++;
-    }
-
-    public int getSendCount() {
-      return sendCount;
-    }
-
-    public void resetSender() {
-      sendCount = 0;
-    }
-  }
-
-  @Before
-  public void setUp() {
-    myAckSender = new MyAckSender();
-    context = Mockito.mock(FragmentContext.class);
-
-    Mockito.when(context.getConfig()).thenReturn(dc);
-    Mockito.when(context.shouldContinue()).thenReturn(true);
-
-    rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
-
-    batch = Mockito.mock(RawFragmentBatch.class);
-
-    Mockito.when(batch.getSender()).thenReturn(myAckSender);
-    Mockito.doAnswer(new Answer<Void>() {
-      public Void answer(InvocationOnMock ignore) throws Throwable {
-        myAckSender.sendOk();
-        return null;
-      }
-    }).when(batch).sendOk();
-
-    FragmentRecordBatch header = FragmentRecordBatch.newBuilder().setIsOutOfMemory(false).setIsLastBatch(false).build();
-    Mockito.when(batch.getHeader()).thenReturn(header);
-
-    /// start the real test
-    int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
-    softLimit = incomingBufferSize * FRAGMENT_COUNT;
-  }
-
-  @Test
-  public void testBackPressure() throws Exception {
-    // No back pressure should be kicked in
-    for ( int i = 0; i < softLimit-1; i++) {
-      rawBuffer.enqueue(batch);
-    }
-
-    // number of responses sent == number of enqueued elements
-    assertEquals(softLimit - 1, myAckSender.getSendCount());
-    rawBuffer.getNext();
-
-    // set senderCount to 0
-    myAckSender.resetSender();
-
-    // test back pressure
-    // number of elements in the queue = softLimit -2
-    // enqueue softlimit elements more
-    for ( int i = 0; i < softLimit; i++) {
-      rawBuffer.enqueue(batch);
-    }
-    // we are exceeding softlimit, so senderCount should not increase
-    assertEquals(1, myAckSender.getSendCount());
-
-    // other responses should be saved in the responsequeue
-    for (int i = 0; i < softLimit-2; i++ ) {
-      rawBuffer.getNext();
-    }
-
-    // still should not send responses, as queue.size should higher then softLimit
-    assertEquals(1, myAckSender.getSendCount());
-
-    // size of the queue == softLimit now
-    for (int i = softLimit; i > 0 ; i-- ) {
-      int senderCount = myAckSender.getSendCount();
-      rawBuffer.getNext();
-      int expectedCountNumber = softLimit - i + senderCount+1;
-      assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit), myAckSender.getSendCount());
-    }
-  }
-
-  @Test
-  public void testAcksWithKill() throws Exception {
-    // Back pressure should be kicked in
-    for ( int i = 0; i < 2*softLimit; i++) {
-      rawBuffer.enqueue(batch);
-    }
-    assertEquals(softLimit - 1, myAckSender.getSendCount());
-    assertTrue(!rawBuffer.getReadController().isEmpty());
-
-    rawBuffer.kill(context);
-
-    // UnlimitedBatchBuffer queue should be cleared
-    assertTrue(rawBuffer.isBufferEmpty());
-
-    // acks queue should be cleared as well
-    assertTrue(rawBuffer.getReadController().isEmpty());
-
-    // all acks should be sent
-    assertEquals(2*softLimit, myAckSender.getSendCount());
-  }
-}


Mime
View raw message