drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [1/3] drill git commit: DRILL-2718: Move counting and tracking of sent batches to FragmentContext
Date Sat, 11 Apr 2015 08:51:25 GMT
Repository: drill
Updated Branches:
  refs/heads/master 08236fa59 -> 4993bd2bb


DRILL-2718: Move counting and tracking of sent batches to FragmentContext

Creates wrapper classes FragmentDataTunnel and FragmentUserDataTunnel which wrap
the DataTunnel and UserClientConnection, respectively, allowing us to use DataTunnels
and UserClientConnections from a global pool, but track pending batches and send status
at the FragmentContext level.

Consolidates the various StatusListener implementations used by the various senders and
instead uses just one implementation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/712b57b9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/712b57b9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/712b57b9

Branch: refs/heads/master
Commit: 712b57b981ec2ace8b16b1c34654ad91e1c2cf31
Parents: 08236fa
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Tue Apr 7 16:59:52 2015 -0700
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Fri Apr 10 23:46:34 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/ops/AccountingDataTunnel.java    | 44 +++++++++++++
 .../exec/ops/AccountingUserConnection.java      | 44 +++++++++++++
 .../org/apache/drill/exec/ops/Consumer.java     | 23 +++++++
 .../apache/drill/exec/ops/FragmentContext.java  | 44 ++++++++++---
 .../apache/drill/exec/ops/SendingAccountor.java | 54 ++++++++++++++++
 .../apache/drill/exec/ops/StatusHandler.java    | 56 +++++++++++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  | 29 +++++++--
 .../drill/exec/physical/impl/ScreenCreator.java | 63 +++----------------
 .../exec/physical/impl/SendingAccountor.java    | 54 ----------------
 .../exec/physical/impl/SingleSenderCreator.java | 44 ++-----------
 .../BroadcastSenderRootExec.java                | 53 +++-------------
 .../PartitionSenderRootExec.java                | 20 ++----
 .../impl/partitionsender/Partitioner.java       |  5 +-
 .../partitionsender/PartitionerTemplate.java    | 28 +++------
 .../impl/partitionsender/StatusHandler.java     | 65 --------------------
 .../apache/drill/exec/rpc/user/UserClient.java  |  6 +-
 16 files changed, 317 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
new file mode 100644
index 0000000..2bcfdbc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+
+/**
+ * Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to
+ * to other Drillbits.
+ */
+public class AccountingDataTunnel {
+  private final DataTunnel tunnel;
+  private final SendingAccountor sendingAccountor;
+  private final RpcOutcomeListener<Ack> statusHandler;
+
+  public AccountingDataTunnel(DataTunnel tunnel, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) {
+    this.tunnel = tunnel;
+    this.sendingAccountor = sendingAccountor;
+    this.statusHandler = statusHandler;
+  }
+
+  public void sendRecordBatch(FragmentWritableBatch batch) {
+    sendingAccountor.increment();
+    tunnel.sendRecordBatch(statusHandler, batch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
new file mode 100644
index 0000000..e3add13
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+
+/**
+ * Wrapper around a {@link org.apache.drill.exec.rpc.user.UserServer.UserClientConnection} that tracks the status of batches
+ * sent to User.
+ */
+public class AccountingUserConnection {
+  private final UserClientConnection connection;
+  private final SendingAccountor sendingAccountor;
+  private final RpcOutcomeListener<Ack> statusHandler;
+
+  public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) {
+    this.connection = connection;
+    this.sendingAccountor = sendingAccountor;
+    this.statusHandler = statusHandler;
+  }
+
+  public void sendData(QueryWritableBatch batch) {
+    sendingAccountor.increment();
+    connection.sendData(statusHandler, batch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
new file mode 100644
index 0000000..9b8ba38
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+//TODO replace this when we switch to JDK8, which includes this
+public interface Consumer<T> {
+  public void accept(T t);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 18b93e9..da2229c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.ops;
 
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -35,10 +37,16 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -57,7 +65,7 @@ import com.google.common.collect.Maps;
 public class FragmentContext implements AutoCloseable, UdfUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
-  private final Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
+  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
   private final UserClientConnection connection;
   private final FragmentStats stats;
@@ -71,6 +79,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
 
   private final DeferredException deferredException = new DeferredException();
   private volatile FragmentContextState state = FragmentContextState.OK;
+  private final SendingAccountor sendingAccountor = new SendingAccountor();
+  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
+
+    @Override
+    public void accept(RpcException e) {
+      fail(e);
+    }
+  };
+  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+  private final AccountingUserConnection accountingUserConnection;
 
   /*
    * TODO we need a state that indicates that cancellation has been requested and
@@ -88,6 +106,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     throws ExecutionSetupException {
     this.context = dbContext;
     this.connection = connection;
+    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
     this.fragment = fragment;
     this.funcRegistry = funcRegistry;
     queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
@@ -240,22 +259,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return context.getCompiler().getImplementationClass(cg, instanceCount);
   }
 
-  /**
-   * Get the user connection associated with this fragment.  This return null unless this is a root fragment.
-   * @return The RPC connection to the query submitter.
-   */
-  public UserClientConnection getConnection() {
-    return connection;
+  public AccountingUserConnection getUserDataTunnel() {
+    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
+    return accountingUserConnection;
   }
 
   public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
     return context.getController().getTunnel(endpoint);
   }
 
-  public DataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
-    DataTunnel tunnel = tunnels.get(endpoint);
+  public AccountingDataTunnel getDataTunnel(DrillbitEndpoint endpoint) {
+    AccountingDataTunnel tunnel = tunnels.get(endpoint);
     if (tunnel == null) {
-      tunnel = context.getDataConnectionsPool().getTunnel(endpoint);
+      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
       tunnels.put(endpoint, tunnel);
     }
     return tunnel;
@@ -325,4 +341,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
         "in functions that can be evaluated at planning time. Make sure that the %s configuration " +
         "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName()));
   }
+
+  /**
+   * Wait for ack that all outgoing batches have been sent
+   */
+  public void waitForSendComplete() {
+    sendingAccountor.waitForSendComplete();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
new file mode 100644
index 0000000..3d0c90e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Account for whether all messages sent have been completed. Necessary before finishing a task so we don't think
+ * buffers are hanging when they will be released.
+ *
+ * TODO: Need to update to use long for number of pending messages.
+ */
+class SendingAccountor {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
+
+  private final AtomicInteger batchesSent = new AtomicInteger(0);
+  private final Semaphore wait = new Semaphore(0);
+
+  void increment() {
+    batchesSent.incrementAndGet();
+  }
+
+  void decrement() {
+    wait.release();
+  }
+
+  public synchronized void waitForSendComplete() {
+    try {
+      int waitForBatches;
+      while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
+        wait.acquire(waitForBatches);
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Failure while waiting for send complete.", e);
+      // TODO InterruptedException
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
new file mode 100644
index 0000000..79fc0b0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+/**
+ * Listener that keeps track of the status of batches sent, and updates the SendingAccountor when status is received
+ * for each batch
+ */
+public class StatusHandler implements RpcOutcomeListener<Ack> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatusHandler.class);
+  private final SendingAccountor sendingAccountor;
+  private final Consumer<RpcException> consumer;
+
+  public StatusHandler(Consumer<RpcException> consumer, SendingAccountor sendingAccountor) {
+    this.consumer = consumer;
+    this.sendingAccountor = sendingAccountor;
+  }
+
+  @Override
+  public void failed(RpcException ex) {
+    sendingAccountor.decrement();
+    consumer.accept(ex);
+  }
+
+  @Override
+  public void success(Ack value, ByteBuf buffer) {
+    sendingAccountor.decrement();
+    if (value.getOk()) {
+      return;
+    }
+
+    logger.error("Data not accepted downstream. Stopping future sends.");
+    // if we didn't get ack ok, we'll need to kill the query.
+    consumer.accept(new RpcException("Data not accepted downstream."));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index a00df9d..5b7ca66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -32,27 +32,33 @@ public abstract class BaseRootExec implements RootExec {
 
   protected OperatorStats stats = null;
   protected OperatorContext oContext = null;
+  protected FragmentContext fragmentContext = null;
 
-  public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException {
-    this.oContext = new OperatorContext(config, context, stats, true);
+  public BaseRootExec(FragmentContext fragmentContext, PhysicalOperator config) throws OutOfMemoryException {
+    this.oContext = new OperatorContext(config, fragmentContext, stats, true);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorContext.getChildCount(config)),
         oContext.getAllocator());
-    context.getStats().addOperatorStats(this.stats);
+    fragmentContext.getStats().addOperatorStats(this.stats);
+    this.fragmentContext = fragmentContext;
   }
 
-  public BaseRootExec(FragmentContext context, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(FragmentContext fragmentContext, OperatorContext oContext, PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
       config.getOperatorType(), OperatorContext.getChildCount(config)),
       oContext.getAllocator());
-    context.getStats().addOperatorStats(this.stats);
+    fragmentContext.getStats().addOperatorStats(this.stats);
+    this.fragmentContext = fragmentContext;
   }
 
   @Override
   public final boolean next() {
     // Stats should have been initialized
     assert stats != null;
+    if (fragmentContext.isFailed()) {
+      return false;
+    }
     try {
       stats.startProcessing();
       return innerNext();
@@ -87,4 +93,17 @@ public abstract class BaseRootExec implements RootExec {
   public void receivingFragmentFinished(FragmentHandle handle) {
     logger.warn("Currently not handling FinishedFragment message");
   }
+
+  @Override
+  public void stop() {
+    // We want to account for the time spent waiting here as Wait time in the operator profile
+    try {
+      stats.startProcessing();
+      stats.startWait();
+      fragmentContext.waitForSendComplete();
+    } finally {
+      stats.stopWait();
+      stats.stopProcessing();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 8038527..6b3caf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,28 +17,21 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.AccountingUserConnection;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
-import org.apache.drill.exec.work.ErrorHelper;
 
 import com.google.common.base.Preconditions;
 
@@ -57,13 +50,9 @@ public class ScreenCreator implements RootCreator<Screen>{
 
   static class ScreenRoot extends BaseRootExec {
 //    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
-    volatile boolean ok = true;
-
-    private final SendingAccountor sendCount = new SendingAccountor();
-
-    final RecordBatch incoming;
-    final FragmentContext context;
-    final UserClientConnection connection;
+    private final RecordBatch incoming;
+    private final FragmentContext context;
+    private final AccountingUserConnection userConnection;
     private RecordMaterializer materializer;
 
     private boolean firstBatch = true;
@@ -79,21 +68,13 @@ public class ScreenCreator implements RootCreator<Screen>{
 
     public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
       super(context, config);
-      // TODO  Edit:  That "as such" doesn't make sense.
-      assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
       this.context = context;
       this.incoming = incoming;
-      this.connection = context.getConnection();
+      this.userConnection = context.getUserDataTunnel();
     }
 
     @Override
     public boolean innerNext() {
-      if (!ok) {
-        stop();
-        context.fail(this.listener.ex);
-        return false;
-      }
-
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
@@ -114,12 +95,11 @@ public class ScreenCreator implements RootCreator<Screen>{
 
           stats.startWait();
           try {
-            connection.sendData(listener, batch);
+            userConnection.sendData(batch);
           } finally {
             stats.stopWait();
           }
           firstBatch = false; // we don't really need to set this. But who knows!
-          sendCount.increment();
         }
 
         return false;
@@ -131,12 +111,11 @@ public class ScreenCreator implements RootCreator<Screen>{
         updateStats(batch);
         stats.startWait();
         try {
-          connection.sendData(listener, batch);
+          userConnection.sendData(batch);
         } finally {
           stats.stopWait();
         }
         firstBatch = false;
-        sendCount.increment();
 
         return true;
       default:
@@ -150,42 +129,16 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
     private void internalStop(){
-      sendCount.waitForSendComplete();
       oContext.close();
       incoming.cleanup();
     }
 
     @Override
     public void stop() {
+      super.stop();
       if (!oContext.isClosed()) {
         internalStop();
       }
-      sendCount.waitForSendComplete();
-    }
-
-    private SendListener listener = new SendListener();
-
-    private class SendListener extends BaseRpcOutcomeListener<Ack>{
-      volatile RpcException ex;
-
-
-      @Override
-      public void success(Ack value, ByteBuf buffer) {
-        super.success(value, buffer);
-        sendCount.decrement();
-      }
-
-      @Override
-      public void failed(RpcException ex) {
-        sendCount.decrement();
-        logger.error("Failure while sending data to user.", ex);
-        boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-        ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger,
-          verbose);
-        ok = false;
-        this.ex = ex;
-      }
-
     }
 
     RecordBatch getIncoming() {

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
deleted file mode 100644
index 21fc800..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl;
-
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Account for whether all messages sent have been completed. Necessary before finishing a task so we don't think
- * buffers are hanging when they will be released.
- *
- * TODO: Need to update to use long for number of pending messages.
- */
-public class SendingAccountor {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
-
-  private final AtomicInteger batchesSent = new AtomicInteger(0);
-  private final Semaphore wait = new Semaphore(0);
-
-  public void increment() {
-    batchesSent.incrementAndGet();
-  }
-
-  public void decrement() {
-    wait.release();
-  }
-
-  public synchronized void waitForSendComplete() {
-    try {
-      int waitForBatches;
-      while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
-        wait.acquire(waitForBatches);
-      }
-    } catch (InterruptedException e) {
-      logger.warn("Failure while waiting for send complete.", e);
-      // TODO InterruptedException
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 1ef7bbd..29d032d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -34,7 +35,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
 
@@ -48,15 +48,13 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
   private static class SingleSenderRootExec extends BaseRootExec {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
 
-    private final SendingAccountor sendCount = new SendingAccountor();
     private final FragmentHandle oppositeHandle;
 
     private RecordBatch incoming;
-    private DataTunnel tunnel;
+    private AccountingDataTunnel tunnel;
     private FragmentHandle handle;
     private SingleSender config;
     private int recMajor;
-    private FragmentContext context;
     private volatile boolean ok = true;
     private volatile boolean done = false;
 
@@ -71,7 +69,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
       super(context, new OperatorContext(config, context, null, false), config);
-      //super(context, config);
       this.incoming = batch;
       assert(incoming != null);
       this.handle = context.getHandle();
@@ -83,7 +80,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
           .setMinorFragmentId(config.getOppositeMinorFragmentId())
           .build();
       tunnel = context.getDataTunnel(config.getDestination());
-      this.context = context;
     }
 
     @Override
@@ -108,10 +104,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(),
             handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
             incoming.getSchema());
-        sendCount.increment();
         stats.startWait();
         try {
-          tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+          tunnel.sendRecordBatch(b2);
         } finally {
           stats.stopWait();
         }
@@ -122,10 +117,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(),
                 handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), incoming.getWritableBatch());
         updateStats(batch);
-        sendCount.increment();
         stats.startWait();
         try {
-          tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+          tunnel.sendRecordBatch(batch);
         } finally {
           stats.stopWait();
         }
@@ -143,8 +137,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     @Override
     public void stop() {
-      ok = false;
-      sendCount.waitForSendComplete();
+      super.stop();
       oContext.close();
       incoming.cleanup();
     }
@@ -153,33 +146,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     public void receivingFragmentFinished(FragmentHandle handle) {
       done = true;
     }
-
-    private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
-
-      @Override
-      public void failed(RpcException ex) {
-        sendCount.decrement();
-        if (!context.isCancelled() && !context.isFailed()) {
-          context.fail(ex);
-        }
-        done = true;
-      }
-
-      @Override
-      public void success(Ack value, ByteBuf buf) {
-        sendCount.decrement();
-        if (value.getOk()) {
-          return;
-        }
-
-        logger.error("Downstream fragment was not accepted.  Stopping future sends.");
-        // if we didn't get ack ok, we'll need to kill the query.
-        context.fail(new RpcException("A downstream fragment batch wasn't accepted.  This fragment thus fails."));
-        done = true;
-      }
-
-    }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index d17fdd4..b3a6a8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -17,29 +17,21 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.broadcastsender;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.List;
 
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.data.DataTunnel;
-import org.apache.drill.exec.work.ErrorHelper;
 
 import com.google.common.collect.ArrayListMultimap;
 
@@ -50,11 +42,10 @@ import com.google.common.collect.ArrayListMultimap;
  */
 public class BroadcastSenderRootExec extends BaseRootExec {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
-  private final StatusHandler statusHandler = new StatusHandler();
   private final FragmentContext context;
   private final BroadcastSender config;
   private final int[][] receivingMinorFragments;
-  private final DataTunnel[] tunnels;
+  private final AccountingDataTunnel[] tunnels;
   private final ExecProtos.FragmentHandle handle;
   private volatile boolean ok;
   private final RecordBatch incoming;
@@ -87,7 +78,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     int destCount = dests.keySet().size();
     int i = 0;
 
-    this.tunnels = new DataTunnel[destCount];
+    this.tunnels = new AccountingDataTunnel[destCount];
     this.receivingMinorFragments = new int[destCount][];
     for(DrillbitEndpoint ep : dests.keySet()){
       List<Integer> minorsList= dests.get(ep);
@@ -104,11 +95,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
 
   @Override
   public boolean innerNext() {
-    if(!ok) {
-      context.fail(statusHandler.ex);
-      return false;
-    }
-
     RecordBatch.IterOutcome out = next(incoming);
     logger.debug("Outcome of sender next {}", out);
     switch(out){
@@ -123,8 +109,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
               receivingMinorFragments[i]);
           stats.startWait();
           try {
-            tunnels[i].sendRecordBatch(this.statusHandler, b2);
-            statusHandler.sendCount.increment();
+            tunnels[i].sendRecordBatch(b2);
           } finally {
             stats.stopWait();
           }
@@ -149,8 +134,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
           updateStats(batch);
           stats.startWait();
           try {
-            tunnels[i].sendRecordBatch(this.statusHandler, batch);
-            statusHandler.sendCount.increment();
+            tunnels[i].sendRecordBatch(batch);
           } finally {
             stats.stopWait();
           }
@@ -171,29 +155,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
 
   @Override
   public void stop() {
-      ok = false;
-      statusHandler.sendCount.waitForSendComplete();
-      oContext.close();
-      incoming.cleanup();
-  }
-
-  private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
-    volatile RpcException ex;
-    private final SendingAccountor sendCount = new SendingAccountor();
-
-    @Override
-    public void success(Ack value, ByteBuf buffer) {
-      sendCount.decrement();
-      super.success(value, buffer);
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      sendCount.decrement();
-      logger.error("Failure while sending data to user.", ex);
-      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
-      ok = false;
-      this.ex = ex;
-    }
+    super.stop();
+    oContext.close();
+    incoming.cleanup();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 6a73cdd..8965bab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -39,7 +40,6 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -48,7 +48,6 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.CopyUtil;
 
@@ -66,10 +65,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   private FragmentContext context;
   private boolean ok = true;
-  private final SendingAccountor sendCount = new SendingAccountor();
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
-  private final StatusHandler statusHandler;
   private final double cost;
 
   private final AtomicIntegerArray remainingReceivers;
@@ -107,7 +104,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.context = context;
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
-    this.statusHandler = new StatusHandler(sendCount, context);
     this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
     this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
     stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
@@ -239,7 +235,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
         endIndex++;
       }
       final OperatorStats partitionStats = new OperatorStats(stats, true);
-      subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, sendCount, oContext, statusHandler,
+      subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
         startIndex, endIndex);
     }
     partitioner = new PartitionerDecorator(subPartitioners, stats, context);
@@ -310,16 +306,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   public void stop() {
     logger.debug("Partition sender stopping.");
+    super.stop();
     ok = false;
     if (partitioner != null) {
       updateAggregateStats();
       partitioner.clear();
     }
-    sendCount.waitForSendComplete();
-
-    if (!statusHandler.isOk()) {
-      context.fail(statusHandler.getException());
-    }
 
     oContext.close();
     incoming.cleanup();
@@ -334,9 +326,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
 
     FragmentHandle handle = context.getHandle();
-    StatusHandler statusHandler = new StatusHandler(sendCount, context);
     for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
-      DataTunnel tunnel = context.getDataTunnel(destination.getEndpoint());
+      AccountingDataTunnel tunnel = context.getDataTunnel(destination.getEndpoint());
       FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
           isLast,
           handle.getQueryId(),
@@ -347,11 +338,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
           schema);
       stats.startWait();
       try {
-        tunnel.sendRecordBatch(statusHandler, writableBatch);
+        tunnel.sendRecordBatch(writableBatch);
       } finally {
         stats.stopWait();
       }
-      sendCount.increment();
     }
     stats.addLongStat(Metric.BATCHES_SENT, 1);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 9d6e98f..95a4813 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface Partitioner {
@@ -35,9 +34,7 @@ public interface Partitioner {
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
                           OperatorStats stats,
-                          SendingAccountor sendingAccountor,
                           OperatorContext oContext,
-                          StatusHandler statusHandler,
                           int start, int count) throws SchemaChangeException;
 
   public abstract void partitionBatch(RecordBatch incoming) throws IOException;
@@ -47,7 +44,7 @@ public interface Partitioner {
   public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
   /**
    * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
-   * @param minorFragmentIndex
+   * @param index
    * @return PartitionOutgoingBatch that matches index within Partitioner. This method can
    * return null if index does not fall within boundary of this Partitioner
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 33d6f95..440af59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -28,12 +28,12 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -47,7 +47,6 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
@@ -89,9 +88,7 @@ public abstract class PartitionerTemplate implements Partitioner {
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
                           OperatorStats stats,
-                          SendingAccountor sendingAccountor,
                           OperatorContext oContext,
-                          StatusHandler statusHandler,
                           int start, int end) throws SchemaChangeException {
 
     this.incoming = incoming;
@@ -112,8 +109,8 @@ public abstract class PartitionerTemplate implements Partitioner {
       // create outgoingBatches only for subset of Destination Points
       if ( fieldId >= start && fieldId < end ) {
         logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId);
-        outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
-          context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler));
+        outgoingBatches.add(new OutgoingRecordBatch(stats, popConfig,
+          context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId()));
       }
       fieldId++;
     }
@@ -202,7 +199,6 @@ public abstract class PartitionerTemplate implements Partitioner {
   /**
    * Helper method to copy data based on partition
    * @param svIndex
-   * @param incoming
    * @throws IOException
    */
   private void doCopy(int svIndex) throws IOException {
@@ -225,14 +221,12 @@ public abstract class PartitionerTemplate implements Partitioner {
 
   public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
 
-    private final DataTunnel tunnel;
+    private final AccountingDataTunnel tunnel;
     private final HashPartitionSender operator;
     private final FragmentContext context;
     private final BufferAllocator allocator;
     private final VectorContainer vectorContainer = new VectorContainer();
-    private final SendingAccountor sendCount;
     private final int oppositeMinorFragmentId;
-    private final StatusHandler statusHandler;
     private final OperatorStats stats;
 
     private boolean isLast = false;
@@ -241,17 +235,14 @@ public abstract class PartitionerTemplate implements Partitioner {
     private int recordCount;
     private int totalRecords;
 
-    public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel,
-                               FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId,
-                               StatusHandler statusHandler) {
+    public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
+                               FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
       this.context = context;
       this.allocator = allocator;
       this.operator = operator;
       this.tunnel = tunnel;
-      this.sendCount = sendCount;
       this.stats = stats;
       this.oppositeMinorFragmentId = oppositeMinorFragmentId;
-      this.statusHandler = statusHandler;
     }
 
     protected void copy(int inIndex) throws IOException {
@@ -308,11 +299,10 @@ public abstract class PartitionerTemplate implements Partitioner {
       updateStats(writableBatch);
       stats.startWait();
       try {
-        tunnel.sendRecordBatch(statusHandler, writableBatch);
+        tunnel.sendRecordBatch(writableBatch);
       } finally {
         stats.stopWait();
       }
-      sendCount.increment();
 
       // If the current batch is the last batch, then set a flag to ignore any requests to flush the data
       // This is possible when the receiver is terminated, but we still get data from input operator
@@ -329,10 +319,6 @@ public abstract class PartitionerTemplate implements Partitioner {
         vectorContainer.zeroVectors();
         allocateOutgoingRecordBatch();
       }
-
-      if (!statusHandler.isOk()) {
-        throw new IOException(statusHandler.getException());
-      }
     }
 
     private void allocateOutgoingRecordBatch() {

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
deleted file mode 100644
index 5e21878..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.work.ErrorHelper;
-
-public class StatusHandler extends BaseRpcOutcomeListener<Ack> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatusHandler.class);
-  RpcException ex;
-  SendingAccountor sendCount;
-  FragmentContext context;
-  boolean ok = true;
-
-  public StatusHandler(SendingAccountor sendCount, FragmentContext context) {
-    this.sendCount = sendCount;
-    this.context = context;
-  }
-
-  @Override
-  public void success(Ack value, ByteBuf buffer) {
-    sendCount.decrement();
-    super.success(value, buffer);
-  }
-
-  @Override
-  public void failed(RpcException ex) {
-    sendCount.decrement();
-    logger.error("Failure while sending data to user.", ex);
-    ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
-    ok = false;
-    this.ex = ex;
-  }
-
-  public boolean isOk() {
-    return ok;
-  }
-
-  public RpcException getException() {
-    return ex;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/712b57b9/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 8022c95..a8bad78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack.Builder;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
@@ -33,6 +34,7 @@ import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
@@ -95,10 +97,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
     switch (rpcType) {
     case RpcType.QUERY_DATA_VALUE:
       queryResultHandler.batchArrived(throttle, pBody, dBody);
-      return new Response(RpcType.ACK, Ack.getDefaultInstance());
+      return new Response(RpcType.ACK, Acks.OK);
     case RpcType.QUERY_RESULT_VALUE:
       queryResultHandler.resultArrived(pBody);
-      return new Response(RpcType.ACK, Ack.getDefaultInstance());
+      return new Response(RpcType.ACK, Acks.OK);
     default:
       throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
     }


Mime
View raw message