drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [3/8] drill git commit: DRILL-2187: Single Broadcast Sender
Date Mon, 09 Mar 2015 08:23:50 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 3d06806..7d157fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -38,32 +38,41 @@ public class FragmentWritableBatch{
   private final FragmentRecordBatch header;
 
   public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch
batch){
-    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
receiveMinorFragmentId, batch.getDef(), batch.getBuffers());
+    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
   }
 
-  private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, RecordBatchDef
def, ByteBuf... buffers){
+  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds, WritableBatch
batch){
+    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
+  }
+
+  private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentId, RecordBatchDef
def, ByteBuf... buffers){
     this.buffers = buffers;
-    FragmentHandle handle = FragmentHandle //
-        .newBuilder() //
-        .setMajorFragmentId(receiveMajorFragmentId) //
-        .setMinorFragmentId(receiveMinorFragmentId) //
-        .setQueryId(queryId) //
-        .build();
-    this.header = FragmentRecordBatch //
+    FragmentRecordBatch.Builder builder = FragmentRecordBatch //
         .newBuilder() //
         .setIsLastBatch(isLast) //
         .setDef(def) //
-        .setHandle(handle) //
+        .setQueryId(queryId)
+        .setReceivingMajorFragmentId(receiveMajorFragmentId) //
         .setSendingMajorFragmentId(sendMajorFragmentId) //
-        .setSendingMinorFragmentId(sendMinorFragmentId) //
-        .build();
+        .setSendingMinorFragmentId(sendMinorFragmentId);
+
+    for(int i : receiveMinorFragmentId){
+      builder.addReceivingMinorFragmentId(i);
+    }
+
+    this.header = builder.build();
   }
 
 
   public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){
-    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId, receiveMinorFragmentId, EMPTY_DEF);
+    return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
new int[]{receiveMinorFragmentId});
+  }
+
+  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds){
+    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
   }
 
+
   public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId,
int sendMinorFragmentId,
                                                              int receiveMajorFragmentId,
int receiveMinorFragmentId, BatchSchema schema){
     return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId,
@@ -79,7 +88,7 @@ public class FragmentWritableBatch{
     }
     RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
     return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId,
receiveMajorFragmentId,
-        receiveMinorFragmentId, def);
+        new int[]{receiveMinorFragmentId}, def);
   }
 
   public ByteBuf[] getBuffers(){

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index e1725e6..edd79ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -20,23 +20,19 @@ package org.apache.drill.exec.record;
 import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.rpc.RemoteConnection;
-import org.apache.drill.exec.rpc.ResponseSender;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.apache.drill.exec.rpc.data.AckSender;
 
 public class RawFragmentBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
 
-  final RemoteConnection connection;
   final FragmentRecordBatch header;
   final DrillBuf body;
-  final ResponseSender sender;
+  final AckSender sender;
 
-  public RawFragmentBatch(RemoteConnection connection, FragmentRecordBatch header, DrillBuf
body, ResponseSender sender) {
+  public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
     super();
     this.header = header;
     this.body = body;
-    this.connection = connection;
     this.sender = sender;
     if (body != null) {
       body.retain();
@@ -62,16 +58,13 @@ public class RawFragmentBatch {
     }
   }
 
-  public RemoteConnection getConnection() {
-    return connection;
-  }
 
-  public ResponseSender getSender() {
+  public AckSender getSender() {
     return sender;
   }
 
   public void sendOk() {
-    sender.send(DataRpcConfig.OK);
+    sender.sendOk();
   }
 
   public long getByteCount() {

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index 880099c..9db1681 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -19,13 +19,6 @@ package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
 
-/**
- * Created with IntelliJ IDEA.
- * User: sphillips
- * Date: 9/30/13
- * Time: 1:40 PM
- * To change this template use File | Settings | File Templates.
- */
 public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
   public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
   public TypedFieldId getValueVectorId(SchemaPath path);

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 96c9911..5eab16a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -165,6 +165,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
       if (RpcConstants.EXTRA_DEBUGGING) {
         logger.debug("Adding message to outbound buffer. {}", outMessage);
       }
+      logger.debug("Sending response with Sender {}", System.identityHashCode(this));
       connection.getChannel().writeAndFlush(outMessage);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java
new file mode 100644
index 0000000..839ebc5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.rpc.ResponseSender;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utility class that allows a group of receivers to confirm reception of a record batch
as a single unit. Response
+ * isn't send upstream until all receivers have successfully consumed data.
+ */
+public class AckSender {
+
+  private AtomicInteger count = new AtomicInteger(0);
+  private ResponseSender sender;
+
+  @VisibleForTesting
+  public AckSender(ResponseSender sender) {
+    this.sender = sender;
+  }
+
+  /**
+   * Add another sender to wait for.
+   */
+  void increment() {
+    count.incrementAndGet();
+  }
+
+  /**
+   * Disable any sending of the ok message.
+   */
+  void clear() {
+    count.set(-100000);
+  }
+
+  /**
+   * Decrement the number of references still holding on to this response. When the number
of references hit zero, send
+   * response upstream.
+   */
+  public void sendOk() {
+    if (0 == count.decrementAndGet()) {
+      sender.send(DataRpcConfig.OK);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index a9eb66f..8e2507b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -95,6 +95,6 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection,
BitCl
 
   @Override
   public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
-    return new DataProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
+    return new DataProtobufLengthDecoder.Client(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 197996d..33f0d09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -57,8 +57,8 @@ public class DataConnectionCreator implements Closeable {
     return completeEndpoint;
   }
 
-  public DataTunnel getTunnel(DrillbitEndpoint endpoint, FragmentHandle handle) {
-    DataConnectionManager newManager = new DataConnectionManager(handle, endpoint, context);
+  public DataTunnel getTunnel(DrillbitEndpoint endpoint) {
+    DataConnectionManager newManager = new DataConnectionManager(endpoint, context);
     DataConnectionManager oldManager = connectionManager.putIfAbsent(endpoint, newManager);
     if(oldManager != null){
       newManager = oldManager;

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
index b2ea855..8a947a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
@@ -31,8 +31,14 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn
   private final DrillbitEndpoint endpoint;
   private final BootStrapContext context;
 
-  public DataConnectionManager(FragmentHandle handle, DrillbitEndpoint endpoint, BootStrapContext
context) {
-    super(hs(handle), endpoint.getAddress(), endpoint.getDataPort());
+  private final static BitClientHandshake HANDSHAKE = BitClientHandshake //
+      .newBuilder() //
+      .setRpcVersion(DataRpcConfig.RPC_VERSION) //
+      .setChannel(RpcChannel.BIT_DATA) //
+      .build();
+
+  public DataConnectionManager(DrillbitEndpoint endpoint, BootStrapContext context) {
+    super(HANDSHAKE, endpoint.getAddress(), endpoint.getDataPort());
     this.endpoint = endpoint;
     this.context = context;
   }
@@ -42,12 +48,4 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn
     return new DataClient(endpoint, context, new CloseHandlerCreator());
   }
 
-  private static BitClientHandshake hs(FragmentHandle handle){
-    return BitClientHandshake //
-        .newBuilder() //
-        .setRpcVersion(DataRpcConfig.RPC_VERSION) //
-        .setChannel(RpcChannel.BIT_DATA) //
-        .setHandle(handle) //
-        .build();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
index 193b050..a74a5a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
@@ -26,9 +26,10 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 
-public class DataProtobufLengthDecoder extends ProtobufLengthDecoder{
+public class DataProtobufLengthDecoder{
 
-  public DataProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler)
{
+  public static class Client extends ProtobufLengthDecoder{
+  public Client(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
     super(allocator, outOfMemoryHandler);
 
   }
@@ -37,4 +38,18 @@ public class DataProtobufLengthDecoder extends ProtobufLengthDecoder{
   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws
Exception {
     super.decode(ctx, in, out);
   }
+  }
+
+  public static class Server extends ProtobufLengthDecoder{
+
+    public Server(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+      super(allocator, outOfMemoryHandler);
+
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
+      super.decode(ctx, in, out);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
index d4a73c3..721b83e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import java.io.IOException;
+
 import io.netty.buffer.DrillBuf;
 
+import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.ResponseSender;
@@ -27,8 +30,8 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
 
 public interface DataResponseHandler {
 
-  public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch
fragmentBatch,
-      DrillBuf data, ResponseSender responder) throws RpcException;
+  public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch,
+      DrillBuf data, AckSender sender) throws FragmentSetupException, IOException;
 
   public void informOutOfMemory();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index 1fcb3e9..e0392fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -50,27 +50,21 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
   }
 
 
-  public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch
fragmentBatch, DrillBuf data, ResponseSender sender) throws RpcException {
+  public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf
data, AckSender sender) throws FragmentSetupException, IOException {
 //    logger.debug("Fragment Batch received {}", fragmentBatch);
-    try {
-      boolean canRun = manager.handle(new RawFragmentBatch(connection, fragmentBatch, data,
sender));
-      if (canRun) {
+
+    boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
+    if (canRun) {
 //        logger.debug("Arriving batch means local batch can run, starting local batch.");
-        // if we've reached the canRun threshold, we'll proceed. This expects handler.handle()
to only return a single
-        // true.
-        bee.startFragmentPendingRemote(manager);
-      }
-      if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
+      // if we've reached the canRun threshold, we'll proceed. This expects handler.handle()
to only return a single
+      // true.
+      bee.startFragmentPendingRemote(manager);
+    }
+    if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
 //        logger.debug("Removing handler.  Is Last Batch {}.  Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
 //            manager.isWaiting());
-        bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
-      }
-
-    } catch (FragmentSetupException e) {
-      logger.error("Failure while attempting to setup new fragment.", e);
-      sender.send(new Response(RpcType.ACK, Acks.FAIL));
-    } catch (IOException e) {
-      throw new RpcException(e);
+      bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index b54841d..807b6c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -35,7 +35,7 @@ public class DataRpcConfig {
       .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
       .build();
 
-  public static int RPC_VERSION = 3;
+  public static int RPC_VERSION = 4;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
   public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index e88455b..62f1429 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import java.io.IOException;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -41,6 +44,7 @@ import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
 import com.google.protobuf.MessageLite;
@@ -100,32 +104,80 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
 
   private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
 
+
+  private FragmentHandle getHandle(FragmentRecordBatch batch, int index){
+    return FragmentHandle.newBuilder()
+        .setQueryId(batch.getQueryId())
+        .setMajorFragmentId(batch.getReceivingMajorFragmentId())
+        .setMinorFragmentId(batch.getReceivingMinorFragmentId(index))
+        .build();
+  }
+
+
   @Override
   protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf
body, ResponseSender sender) throws RpcException {
     assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
 
-    FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
-    FragmentHandle handle = fragmentBatch.getHandle();
+    final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
+    final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
 
+    Pointer<DrillBuf> out = new Pointer<DrillBuf>();
+    AckSender ack = new AckSender(sender);
+    // increment so we don't get false returns.
+    ack.increment();
     try {
-      FragmentManager manager = workBus.getFragmentManager(fragmentBatch.getHandle());
-      if (manager == null) {
-        if (body != null) {
-          body.release();
+
+      if(body == null){
+
+        for(int minor = 0; minor < targetCount; minor++){
+          FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+          if(manager != null){
+            ack.increment();
+            dataHandler.handle(manager, fragmentBatch, null, ack);
+          }
         }
+
       }else{
-        BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-        if (body != null && !manager.getFragmentContext().isCancelled()) {
-          if (!allocator.takeOwnership((DrillBuf) body.unwrap())) {
-            dataHandler.handle(connection, manager, OOM_FRAGMENT, null, null);
+
+        for(int minor = 0; minor < targetCount; minor++){
+          FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+          if(manager == null){
+            continue;
+          }
+
+          BufferAllocator allocator = manager.getFragmentContext().getAllocator();
+
+          boolean withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
+
+          if(!withinMemoryEnvelope){
+            // if we over reserved, we need to add poison pill before batch.
+            dataHandler.handle(manager, OOM_FRAGMENT, null, null);
           }
+
+          ack.increment();
+          dataHandler.handle(manager, fragmentBatch, out.value, ack);
+
+          // make sure to release the reference count we have to the new buffer.
+          // dataHandler.handle should have taken any ownership it needed.
+          out.value.release();
         }
-        dataHandler.handle(connection, manager, fragmentBatch, (DrillBuf) body, sender);
+        out = null;
       }
 
-    } catch (FragmentSetupException e) {
-      logger.error("Failure while getting fragment manager. {}", QueryIdHelper.getQueryIdentifier(handle),
 e);
+    } catch (IOException | FragmentSetupException e) {
+      logger.error("Failure while getting fragment manager. {}",
+          QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
+              fragmentBatch.getReceivingMajorFragmentId(),
+              fragmentBatch.getReceivingMinorFragmentIdList()));
+      ack.clear();
       sender.send(new Response(RpcType.ACK, Acks.FAIL));
+    } finally {
+
+      // decrement the extra reference we grabbed at the top.
+      ack.sendOk();
+      if(out != null && out.value != null){
+        out.value.release();
+      }
     }
   }
 
@@ -157,7 +209,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
 
   @Override
   public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler)
{
-    return new DataProtobufLengthDecoder(allocator, outOfMemoryHandler);
+    return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6d9fa5b..7ad3f77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -49,6 +49,7 @@ public class SystemOptionManager implements OptionManager {
       PlannerSettings.MULTIPHASE,
       PlannerSettings.BROADCAST,
       PlannerSettings.BROADCAST_THRESHOLD,
+      PlannerSettings.BROADCAST_FACTOR,
       PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR,
       PlannerSettings.PRODUCER_CONSUMER,
       PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index 141c434..340ccc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -21,6 +21,7 @@ import java.util.Queue;
 
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.apache.drill.exec.rpc.data.AckSender;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -28,9 +29,9 @@ import com.google.common.collect.Queues;
 public class ResponseSenderQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResponseSenderQueue.class);
 
-  private Queue<ResponseSender> q = Queues.newConcurrentLinkedQueue();
+  private Queue<AckSender> q = Queues.newConcurrentLinkedQueue();
 
-  public void enqueueResponse(ResponseSender sender){
+  public void enqueueResponse(AckSender sender){
     q.add(sender);
   }
 
@@ -44,12 +45,12 @@ public class ResponseSenderQueue {
    * @return
    */
   public int flushResponses(int count){
-    logger.debug("queue.size: {}, count: {}", q.size(), count);
+    logger.trace("queue.size: {}, count: {}", q.size(), count);
     int i = 0;
     while(!q.isEmpty() && i < count){
-      ResponseSender s = q.poll();
+      AckSender s = q.poll();
       if(s != null){
-        s.send(DataRpcConfig.OK);
+        s.sendOk();
       }
       i++;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 6ee93ab..f0b4983 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -271,7 +271,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
       BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
       DrillBuf buf = allocator.buffer(bodyLength);
       buf.writeBytes(stream, bodyLength);
-      batch = new RawFragmentBatch(null, header, buf, null);
+      batch = new RawFragmentBatch(header, buf, null);
       buf.release();
       available = true;
       latch.countDown();

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 895918c..3d5b948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -55,7 +55,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
-    logger.debug("softLimit: {}, startLimit: {}", softlimit, startlimit);
+    logger.trace("softLimit: {}, startLimit: {}", softlimit, startlimit);
     this.buffer = Queues.newLinkedBlockingDeque();
     this.fragmentCount = fragmentCount;
     this.streamCounter = fragmentCount;
@@ -75,7 +75,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       }
     }
     if (batch.getHeader().getIsOutOfMemory()) {
-      logger.debug("Setting autoread false");
+      logger.trace("Setting autoread false");
       RawFragmentBatch firstBatch = buffer.peekFirst();
       FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
       if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory())
{
@@ -86,7 +86,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     }
     buffer.add(batch);
     if (buffer.size() >= softlimit) {
-      logger.debug("buffer.size: {}", buffer.size());
+      logger.trace("buffer.size: {}", buffer.size());
       overlimit.set(true);
       readController.enqueueResponse(batch.getSender());
     } else {
@@ -147,7 +147,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   public RawFragmentBatch getNext() {
 
     if (outOfMemory.get() && buffer.size() < 10) {
-      logger.debug("Setting autoread true");
+      logger.trace("Setting autoread true");
       outOfMemory.set(false);
       readController.flushResponses();
     }
@@ -177,7 +177,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       int flushCount = softlimit - buffer.size();
       if ( flushCount > 0 ) {
         int flushed = readController.flushResponses(flushCount);
-        logger.debug("flush {} entries, flushed {} entries ", flushCount, flushed);
+        logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
         if ( flushed == 0 ) {
           // queue is empty - nothing to do for now
           overlimit.set(false);

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/broadcastsender/TestBroadcast.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/broadcastsender/TestBroadcast.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/broadcastsender/TestBroadcast.java
new file mode 100644
index 0000000..f367a69
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/broadcastsender/TestBroadcast.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.broadcastsender;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestBroadcast extends BaseTestQuery {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBroadcast.class);
+
+  String broadcastQuery = "select * from "
+      + "dfs.`${WORKING_PATH}/src/test/resources/broadcast/sales` s "
+      + "INNER JOIN "
+      + "dfs.`${WORKING_PATH}/src/test/resources/broadcast/customer` c "
+      + "ON s.id = c.id";
+
+  @Test
+  public void plansWithBroadcast() throws Exception {
+    //TODO: actually verify that this plan has a broadcast exchange in it once plan tools
are enabled.
+    setup();
+    test("explain plan for " + broadcastQuery);
+  }
+
+  @Test
+  public void broadcastExecuteWorks() throws Exception {
+    setup();
+    test(broadcastQuery);
+  }
+
+
+  private void setup() throws Exception{
+    testNoResult("alter session set `planner.slice_target` = 1");
+    testNoResult("alter session set `planner.enable_broadcast_join` = true");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 7b771f5..3749716 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +35,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -50,6 +52,7 @@ import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.AckSender;
 import org.apache.drill.exec.rpc.data.DataConnectionManager;
 import org.apache.drill.exec.rpc.data.DataResponseHandler;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
@@ -87,7 +90,7 @@ public class TestBitRpc extends ExecTest {
 
     port = server.bind(port, false);
     DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
-    DataConnectionManager manager = new DataConnectionManager(FragmentHandle.getDefaultInstance(),
ep, c2);
+    DataConnectionManager manager = new DataConnectionManager(ep, c2);
     DataTunnel tunnel = new DataTunnel(manager);
     AtomicLong max = new AtomicLong(0);
     for (int i = 0; i < 40; i++) {
@@ -152,8 +155,12 @@ public class TestBitRpc extends ExecTest {
     int v = 0;
 
     @Override
-    public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch
fragmentBatch, DrillBuf data, ResponseSender sender)
-        throws RpcException {
+    public void informOutOfMemory() {
+    }
+
+    @Override
+    public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf
data, AckSender sender)
+        throws FragmentSetupException, IOException {
       // System.out.println("Received.");
       try {
         v++;
@@ -165,11 +172,7 @@ public class TestBitRpc extends ExecTest {
         // TODO Auto-generated catch block
         e.printStackTrace();
       }
-      sender.send(DataRpcConfig.OK);
-    }
-
-    @Override
-    public void informOutOfMemory() {
+      sender.sendOk();
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index a710d21..b2e5740 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.data.AckSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,22 +50,25 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
   private static int FRAGMENT_COUNT = 5;
   private DrillConfig dc = DrillConfig.create();
-  private MySender mySender;
+  private MyAckSender myAckSender;
   private UnlimitedRawBatchBuffer rawBuffer;
   private RawFragmentBatch batch;
   private FragmentContext context;
   private int softLimit;
 
-  private static class MySender implements ResponseSender {
+  private static class MyAckSender extends AckSender {
 
     private int sendCount = 0;
 
+    public MyAckSender() {
+      super(null);
+    }
+
     @Override
-    public void send(Response r) {
+    public void sendOk() {
       sendCount++;
     }
 
-
     public int getSendCount() {
       return sendCount;
     }
@@ -76,7 +80,7 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
   @Before
   public void setUp() {
-    mySender = new MySender();
+    myAckSender = new MyAckSender();
     context = Mockito.mock(FragmentContext.class);
 
     Mockito.when(context.getConfig()).thenReturn(dc);
@@ -85,10 +89,10 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
     batch = Mockito.mock(RawFragmentBatch.class);
 
-    Mockito.when(batch.getSender()).thenReturn(mySender);
+    Mockito.when(batch.getSender()).thenReturn(myAckSender);
     Mockito.doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock ignore) throws Throwable {
-        mySender.send(DataRpcConfig.OK);
+        myAckSender.sendOk();
         return null;
       }
     }).when(batch).sendOk();
@@ -109,11 +113,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     }
 
     // number of responses sent == number of enqueued elements
-    assertEquals(softLimit - 1, mySender.getSendCount());
+    assertEquals(softLimit - 1, myAckSender.getSendCount());
     rawBuffer.getNext();
 
     // set senderCount to 0
-    mySender.resetSender();
+    myAckSender.resetSender();
 
     // test back pressure
     // number of elements in the queue = softLimit -2
@@ -122,7 +126,7 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
       rawBuffer.enqueue(batch);
     }
     // we are exceeding softlimit, so senderCount should not increase
-    assertEquals(1, mySender.getSendCount());
+    assertEquals(1, myAckSender.getSendCount());
 
     // other responses should be saved in the responsequeue
     for (int i = 0; i < softLimit-2; i++ ) {
@@ -130,14 +134,14 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     }
 
     // still should not send responses, as queue.size should higher then softLimit
-    assertEquals(1, mySender.getSendCount());
+    assertEquals(1, myAckSender.getSendCount());
 
     // size of the queue == softLimit now
     for (int i = softLimit; i > 0 ; i-- ) {
-      int senderCount = mySender.getSendCount();
+      int senderCount = myAckSender.getSendCount();
       rawBuffer.getNext();
       int expectedCountNumber = softLimit - i + senderCount+1;
-      assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit),
mySender.getSendCount());
+      assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit),
myAckSender.getSendCount());
     }
   }
 
@@ -147,7 +151,7 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     for ( int i = 0; i < 2*softLimit; i++) {
       rawBuffer.enqueue(batch);
     }
-    assertEquals(softLimit - 1, mySender.getSendCount());
+    assertEquals(softLimit - 1, myAckSender.getSendCount());
     assertTrue(!rawBuffer.getReadController().isEmpty());
 
     rawBuffer.kill(context);
@@ -159,6 +163,6 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     assertTrue(rawBuffer.getReadController().isEmpty());
 
     // all acks should be sent
-    assertEquals(2*softLimit, mySender.getSendCount());
+    assertEquals(2*softLimit, myAckSender.getSendCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/resources/broadcast/customer/cust.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/broadcast/customer/cust.json b/exec/java-exec/src/test/resources/broadcast/customer/cust.json
new file mode 100644
index 0000000..5c3dac7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/broadcast/customer/cust.json
@@ -0,0 +1,4 @@
+{id: 1, name: "John"}
+{id: 2, name: "Jane"}
+{id: 3, name: "Jimmy"}
+{id: 4, name: "Jennifer"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/resources/broadcast/sales/f1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/broadcast/sales/f1.json b/exec/java-exec/src/test/resources/broadcast/sales/f1.json
new file mode 100644
index 0000000..e5f81ae
--- /dev/null
+++ b/exec/java-exec/src/test/resources/broadcast/sales/f1.json
@@ -0,0 +1,2 @@
+{id:1, val:6}
+{id:2, val:8}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9fd1430d/exec/java-exec/src/test/resources/broadcast/sales/f2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/broadcast/sales/f2.json b/exec/java-exec/src/test/resources/broadcast/sales/f2.json
new file mode 100644
index 0000000..e0a1e97
--- /dev/null
+++ b/exec/java-exec/src/test/resources/broadcast/sales/f2.json
@@ -0,0 +1,2 @@
+{id:3, val:7}
+{id:4, val:9}
\ No newline at end of file


Mime
View raw message