drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [26/45] drill git commit: DRILL-3987: (MOVE) Extract RPC, memory-base and memory-impl as separate modules.
Date Fri, 13 Nov 2015 02:37:56 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
deleted file mode 100644
index 61922a1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.Closeable;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.drill.common.SerializedExecutor;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-/**
- * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
- * system.
- *
- * @param <T>
- */
-public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
-  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-
-  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
-
-  protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
-
-  protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
-
-  protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{
-    sender.send(handle(connection, rpcType, pBody, dBody));
-  }
-
-  protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
-
-  public abstract boolean isClient();
-
-  protected final RpcConfig rpcConfig;
-
-  protected volatile SocketAddress local;
-  protected volatile SocketAddress remote;
-
-
-  public RpcBus(RpcConfig rpcConfig) {
-    this.rpcConfig = rpcConfig;
-  }
-
-  protected void setAddresses(SocketAddress remote, SocketAddress local){
-    this.remote = remote;
-    this.local = local;
-  }
-
-  <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
-    this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
-    return rpcFuture;
-  }
-
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
-    send(listener, connection, rpcType, protobufBody, clazz, false, dataBodies);
-  }
-
-  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) {
-
-    Preconditions
-        .checkArgument(
-            allowInEventLoop || !connection.inEventLoop(),
-            "You attempted to send while inside the rpc event thread.  This isn't allowed because sending will block if the channel is backed up.");
-
-    ByteBuf pBuffer = null;
-    boolean completed = false;
-
-    try {
-
-      if (!allowInEventLoop && !connection.blockOnNotWritable(listener)) {
-        // if we're in not in the event loop and we're interrupted while blocking, skip sending this message.
-        return;
-      }
-
-      assert !Arrays.asList(dataBodies).contains(null);
-      assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
-
-      Preconditions.checkNotNull(protobufBody);
-      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
-      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
-      ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
-      channelFuture.addListener(futureListener);
-      channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-      completed = true;
-    } catch (Exception | AssertionError e) {
-      listener.failed(new RpcException("Failure sending message.", e));
-    } finally {
-      if (!completed) {
-        if (pBuffer != null) {
-          pBuffer.release();
-        }
-        if (dataBodies != null) {
-          for (ByteBuf b : dataBodies) {
-            b.release();
-          }
-
-        }
-      }
-      ;
-    }
-  }
-
-  public abstract C initRemoteConnection(SocketChannel channel);
-
-  public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
-
-    final C clientConnection;
-    private final Channel channel;
-
-    public ChannelClosedHandler(C clientConnection, Channel channel) {
-      this.channel = channel;
-      this.clientConnection = clientConnection;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      String msg;
-      if(local!=null) {
-        msg = String.format("Channel closed %s <--> %s.", local, remote);
-      }else{
-        msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
-      }
-
-      if (RpcBus.this.isClient()) {
-        if(local != null) {
-          logger.info(String.format(msg));
-        }
-      } else {
-        queue.channelClosed(new ChannelClosedException(msg));
-      }
-
-      clientConnection.close();
-    }
-
-  }
-
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
-    return new ChannelClosedHandler(clientConnection, channel);
-  }
-
-  private interface Recyclable {
-    public void recycle();
-  }
-
-  private class ResponseSenderImpl implements ResponseSender {
-
-    private RemoteConnection connection;
-    private int coordinationId;
-    private final AtomicBoolean sent = new AtomicBoolean(false);
-    private final Recyclable recyclable;
-
-    public ResponseSenderImpl(Recyclable recyclable) {
-      this.recyclable = recyclable;
-    }
-
-    void set(RemoteConnection connection, int coordinationId){
-      this.connection = connection;
-      this.coordinationId = coordinationId;
-      sent.set(false);
-    }
-
-    public void send(Response r) {
-      try {
-        assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
-        sendOnce();
-        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId,
-            r.pBody, r.dBodies);
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Adding message to outbound buffer. {}", outMessage);
-        }
-        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
-        connection.getChannel().writeAndFlush(outMessage);
-      } finally {
-        recyclable.recycle();
-      }
-    }
-
-    /**
-     * Ensures that each sender is only used once.
-     */
-    private void sendOnce() {
-      if (!sent.compareAndSet(false, true)) {
-        throw new IllegalStateException("Attempted to utilize a sender multiple times.");
-      }
-    }
-
-    void sendFailure(UserRpcException e){
-      try {
-        sendOnce();
-        UserException uex = UserException.systemError(e)
-            .addIdentity(e.getEndpoint())
-            .build(logger);
-
-        logger.error("Unexpected Error while handling request message", e);
-
-        OutboundRpcMessage outMessage = new OutboundRpcMessage(
-            RpcMode.RESPONSE_FAILURE,
-            0,
-            coordinationId,
-            uex.getOrCreatePBError(false)
-            );
-
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Adding message to outbound buffer. {}", outMessage);
-        }
-        connection.getChannel().writeAndFlush(outMessage);
-      } finally {
-        recyclable.recycle();
-      }
-    }
-
-  }
-
-
-  protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
-
-    private final RpcEventHandler exec;
-    private final C connection;
-
-    public InboundHandler(C connection) {
-      super();
-      this.connection = connection;
-      this.exec = new RpcEventHandler(rpcConfig.getExecutor());
-    }
-
-    @Override
-    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
-      if (!ctx.channel().isOpen()) {
-        return;
-      }
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Received message {}", msg);
-      }
-      final Channel channel = connection.getChannel();
-      final Stopwatch watch = new Stopwatch().start();
-
-      try{
-
-        switch (msg.mode) {
-        case REQUEST:
-          RequestEvent reqEvent = requestRecycler.get();
-          reqEvent.set(msg.coordinationId, connection, msg.rpcType, msg.pBody, msg.dBody);
-          exec.execute(reqEvent);
-          break;
-
-        case RESPONSE:
-          ResponseEvent respEvent = responseRecycler.get();
-          respEvent.set(msg.rpcType, msg.coordinationId, msg.pBody, msg.dBody);
-          exec.execute(respEvent);
-          break;
-
-        case RESPONSE_FAILURE:
-          DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-          queue.updateFailedFuture(msg.coordinationId, failure);
-          if (RpcConstants.EXTRA_DEBUGGING) {
-            logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
-          }
-          break;
-
-        case PING:
-          channel.writeAndFlush(PONG);
-          break;
-
-        case PONG:
-          // noop.
-          break;
-
-        default:
-          throw new UnsupportedOperationException();
-        }
-      } finally {
-        long time = watch.elapsed(TimeUnit.MILLISECONDS);
-        long delayThreshold = Integer.parseInt(System.getProperty("drill.exec.rpcDelayWarning", "500"));
-        if (time > delayThreshold) {
-          logger.warn(String.format(
-              "Message of mode %s of rpc type %d took longer than %dms.  Actual duration was %dms.",
-              msg.mode, msg.rpcType, delayThreshold, time));
-        }
-        msg.release();
-      }
-    }
-  }
-
-  public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException {
-    try {
-      ByteBufInputStream is = new ByteBufInputStream(pBody);
-      return parser.parseFrom(is);
-    } catch (InvalidProtocolBufferException e) {
-      throw new RpcException(String.format("Failure while decoding message with parser of type. %s", parser.getClass().getCanonicalName()), e);
-    }
-  }
-
-  class RpcEventHandler extends SerializedExecutor {
-
-    public RpcEventHandler(Executor underlyingExecutor) {
-      super(rpcConfig.getName() + "-rpc-event-queue", underlyingExecutor);
-    }
-
-    @Override
-    protected void runException(Runnable command, Throwable t) {
-      logger.error("Failure while running rpc command.", t);
-    }
-
-  }
-
-  private final Recycler<RequestEvent> requestRecycler = new Recycler<RequestEvent>() {
-    @Override
-    protected RequestEvent newObject(Handle handle) {
-      return new RequestEvent(handle);
-    }
-  };
-
-  private class RequestEvent implements Runnable, Recyclable {
-    private final ResponseSenderImpl sender;
-    private final Handle handle;
-    private C connection;
-    private int rpcType;
-    private ByteBuf pBody;
-    private ByteBuf dBody;
-
-    RequestEvent(Handle handle){
-      this.handle = handle;
-      sender = new ResponseSenderImpl(this);
-    }
-
-    public void set(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) {
-      this.connection = connection;
-      this.rpcType = rpcType;
-      this.pBody = pBody;
-      this.dBody = dBody;
-      sender.set(connection, coordinationId);
-
-      if(pBody != null){
-        pBody.retain();
-      }
-
-      if(dBody != null){
-        dBody.retain();
-      }
-    }
-
-    @Override
-    public void run() {
-      try {
-        handle(connection, rpcType, pBody, dBody, sender);
-      } catch (UserRpcException e) {
-        sender.sendFailure(e);
-      } catch (Exception e) {
-        logger.error("Failure while handling message.", e);
-      }finally{
-        if(pBody != null){
-          pBody.release();
-        }
-
-        if(dBody != null){
-          dBody.release();
-        }
-      }
-
-    }
-
-    @Override
-    public void recycle() {
-      // We must defer recycling until the sender has been used.
-      requestRecycler.recycle(this, handle);
-    }
-
-
-  }
-
-  private final Recycler<ResponseEvent> responseRecycler = new Recycler<ResponseEvent>() {
-    @Override
-    protected ResponseEvent newObject(Handle handle) {
-      return new ResponseEvent(handle);
-    }
-  };
-
-  private class ResponseEvent implements Runnable {
-    private final Handle handle;
-
-    private int rpcType;
-    private int coordinationId;
-    private ByteBuf pBody;
-    private ByteBuf dBody;
-
-    public ResponseEvent(Handle handle){
-      this.handle = handle;
-    }
-
-    public void set(int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
-      this.rpcType = rpcType;
-      this.coordinationId = coordinationId;
-      this.pBody = pBody;
-      this.dBody = dBody;
-
-      if(pBody != null){
-        pBody.retain();
-      }
-
-      if(dBody != null){
-        dBody.retain();
-      }
-    }
-
-    public void run(){
-      try {
-        MessageLite m = getResponseDefaultInstance(rpcType);
-        assert rpcConfig.checkReceive(rpcType, m.getClass());
-        RpcOutcome<?> rpcFuture = queue.getFuture(rpcType, coordinationId, m.getClass());
-        Parser<?> parser = m.getParserForType();
-        Object value = parser.parseFrom(new ByteBufInputStream(pBody, pBody.readableBytes()));
-        rpcFuture.set(value, dBody);
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-        }
-      } catch (Exception ex) {
-        logger.error("Failure while handling response.", ex);
-      }finally{
-        if(pBody != null){
-          pBody.release();
-        }
-
-        if(dBody != null){
-          dBody.release();
-        }
-
-        responseRecycler.recycle(this, handle);
-      }
-
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
deleted file mode 100644
index 11b07ad..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
-
-  volatile ByteBuf buffer;
-
-  public RpcCheckedFuture(ListenableFuture<T> delegate) {
-    super(delegate);
-  }
-
-  public void set(T obj, ByteBuf buffer){
-    this.buffer = buffer;
-  }
-
-  @Override
-  protected RpcException mapException(Exception e) {
-    return RpcException.mapException(e);
-  }
-
-  @Override
-  public ByteBuf getBuffer() {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
deleted file mode 100644
index 93b901c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCommand.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import com.google.protobuf.MessageLite;
-
-public interface RpcCommand<T extends MessageLite, C extends RemoteConnection> extends RpcConnectionHandler<C>{
-
-  public abstract void connectionAvailable(C connection);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
deleted file mode 100644
index 22b253a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-
-public class RpcConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
-
-  private final String name;
-  private final int timeout;
-  private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
-  private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
-  private final Executor executor;
-
-  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
-      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout, Executor executor) {
-    Preconditions.checkNotNull(executor, "Executor must be defined.");
-    this.name = name;
-    this.timeout = timeout;
-    this.sendMap = ImmutableMap.copyOf(sendMap);
-    this.receiveMap = ImmutableMap.copyOf(receiveMap);
-    this.executor = executor;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public int getTimeout() {
-    return timeout;
-  }
-
-  public boolean hasTimeout() {
-    return timeout > 0;
-  }
-
-  public Executor getExecutor() {
-    return executor;
-  }
-
-  public boolean checkReceive(int rpcType, Class<?> receiveClass) {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
-    }
-    RpcMessageType<?,?,?> type = receiveMap.get(rpcType);
-    if (type == null) {
-      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType));
-    }
-
-    if (receiveClass != type.getRet()) {
-      throw new IllegalStateException(String.format("%s: The definition for receive doesn't match implementation code.  The definition is %s however the current receive for this type was of type %s.", name, type, receiveClass.getCanonicalName()));
-    }
-    return true;
-  }
-
-  public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass) {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug(String.format("Checking send classes for send RpcType %s.  Send Class is %s and Receive class is %s.", send, sendClass, receiveClass));
-    }
-    RpcMessageType<?,?,?> type = sendMap.get(send);
-    if (type == null) {
-      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send));
-    }
-
-    if (type.getSend() != sendClass) {
-      throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName()));
-    }
-    if (type.getRet() != receiveClass) {
-      throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName()));
-    }
-
-    return true;
-  }
-
-  public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass) {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug(String.format("Checking responce send of type %s with response class of %s.",  responseType, responseClass));
-    }
-    RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber());
-    if (type == null) {
-      throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType));
-    }
-    if (type.getRet() != responseClass) {
-      throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code.  The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName()));
-    }
-
-    return true;
-  }
-
-  public static class RpcMessageType<SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>{
-    private T sendEnum;
-    private Class<SEND> send;
-    private T receiveEnum;
-    private Class<RECEIVE> ret;
-    public RpcMessageType(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> ret) {
-      super();
-      this.sendEnum = sendEnum;
-      this.send = send;
-      this.receiveEnum = receiveEnum;
-      this.ret = ret;
-    }
-    public Class<SEND> getSend() {
-      return send;
-    }
-    public void setSend(Class<SEND> send) {
-      this.send = send;
-    }
-    public T getSendEnum() {
-      return sendEnum;
-    }
-    public void setSendEnum(T sendEnum) {
-      this.sendEnum = sendEnum;
-    }
-    public Class<RECEIVE> getRet() {
-      return ret;
-    }
-    public void setRet(Class<RECEIVE> ret) {
-      this.ret = ret;
-    }
-    public T getReceiveEnum() {
-      return receiveEnum;
-    }
-    public void setReceiveEnum(T receiveEnum) {
-      this.receiveEnum = receiveEnum;
-    }
-    @Override
-    public String toString() {
-      return "RpcMessageType [sendEnum=" + sendEnum + ", send=" + send + ", receiveEnum=" + receiveEnum + ", ret="
-          + ret + "]";
-    }
-
-  }
-
-  public static RpcConfigBuilder newBuilder() {
-    return new RpcConfigBuilder();
-  }
-
-  public static class RpcConfigBuilder {
-    private String name;
-    private int timeout = -1;
-    private Executor executor;
-    private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
-    private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
-
-    private RpcConfigBuilder() {
-    }
-
-    public RpcConfigBuilder name(String name) {
-      this.name = name;
-      return this;
-    }
-
-    public RpcConfigBuilder timeout(int timeoutInSeconds) {
-      this.timeout = timeoutInSeconds;
-      return this;
-    }
-
-    public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>  RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
-      RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec);
-      this.sendMap.put(sendEnum, type);
-      this.receiveMap.put(receiveEnum.getNumber(), type);
-      return this;
-    }
-
-    public RpcConfigBuilder executor(Executor executor) {
-      this.executor = executor;
-      return this;
-    }
-
-    public RpcConfig build() {
-      Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
-      Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
-      return new RpcConfig(name, sendMap, receiveMap, timeout, executor);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
deleted file mode 100644
index 7618231..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-public interface RpcConnectionHandler<T extends RemoteConnection> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
-
-  public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
-
-  public void connectionSucceeded(T connection);
-  public void connectionFailed(FailureType type, Throwable t);
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
deleted file mode 100644
index 4be365c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-public class RpcConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConstants.class);
-
-  private RpcConstants(){}
-
-  public static final boolean SOME_DEBUGGING = false;
-  public static final boolean EXTRA_DEBUGGING = false;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
deleted file mode 100644
index ac48187..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.CorruptedFrameException;
-import io.netty.handler.codec.MessageToMessageDecoder;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
-
-/**
- * Converts a previously length adjusted buffer into an RpcMessage.
- */
-class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
-  final org.slf4j.Logger logger;
-
-  private final AtomicLong messageCounter = new AtomicLong();
-
-  public RpcDecoder(String name) {
-    this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "-" + name);
-  }
-
-
-  @Override
-  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
-    if (!ctx.channel().isOpen()) {
-      return;
-    }
-
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("Inbound rpc message received.");
-    }
-
-    // now, we know the entire message is in the buffer and the buffer is constrained to this message. Additionally,
-    // this process should avoid reading beyond the end of this buffer so we inform the ByteBufInputStream to throw an
-    // exception if be go beyond readable bytes (as opposed to blocking).
-    final ByteBufInputStream is = new ByteBufInputStream(buffer, buffer.readableBytes());
-
-    // read the rpc header, saved in delimited format.
-    checkTag(is, RpcEncoder.HEADER_TAG);
-    final RpcHeader header = RpcHeader.parseDelimitedFrom(is);
-
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug(" post header read index {}", buffer.readerIndex());
-    }
-
-    // read the protobuf body into a buffer.
-    checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG);
-    final int pBodyLength = readRawVarint32(is);
-    final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength);
-    buffer.skipBytes(pBodyLength);
-    pBody.retain(1);
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody);
-    }
-
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("post protobufbody read index {}", buffer.readerIndex());
-    }
-
-    ByteBuf dBody = null;
-    int dBodyLength = 0;
-
-    // read the data body.
-    if (buffer.readableBytes() > 0) {
-
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available());
-      }
-      checkTag(is, RpcEncoder.RAW_BODY_TAG);
-      dBodyLength = readRawVarint32(is);
-      if (buffer.readableBytes() != dBodyLength) {
-        throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes()));
-      }
-      dBody = buffer.slice();
-      dBody.retain(1);
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Read raw body of {}", dBody);
-      }
-
-    }else{
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("No need to read raw body, no readable bytes left.");
-      }
-    }
-
-
-    // return the rpc message.
-    InboundRpcMessage m = new InboundRpcMessage(header.getMode(), header.getRpcType(), header.getCoordinationId(),
-        pBody, dBody);
-
-    // move the reader index forward so the next rpc call won't try to work with it.
-    buffer.skipBytes(dBodyLength);
-    messageCounter.incrementAndGet();
-    if (RpcConstants.SOME_DEBUGGING) {
-      logger.debug("Inbound Rpc Message Decoded {}.", m);
-    }
-    out.add(m);
-
-  }
-
-  private void checkTag(ByteBufInputStream is, int expectedTag) throws IOException {
-    int actualTag = readRawVarint32(is);
-    if (actualTag != expectedTag) {
-      throw new CorruptedFrameException(String.format("Expected to read a tag of %d but actually received a value of %d.  Happened after reading %d message.", expectedTag, actualTag, messageCounter.get()));
-    }
-  }
-
-  // Taken from CodedInputStream and modified to enable ByteBufInterface.
-  public static int readRawVarint32(ByteBufInputStream is) throws IOException {
-    byte tmp = is.readByte();
-    if (tmp >= 0) {
-      return tmp;
-    }
-    int result = tmp & 0x7f;
-    if ((tmp = is.readByte()) >= 0) {
-      result |= tmp << 7;
-    } else {
-      result |= (tmp & 0x7f) << 7;
-      if ((tmp = is.readByte()) >= 0) {
-        result |= tmp << 14;
-      } else {
-        result |= (tmp & 0x7f) << 14;
-        if ((tmp = is.readByte()) >= 0) {
-          result |= tmp << 21;
-        } else {
-          result |= (tmp & 0x7f) << 21;
-          result |= (tmp = is.readByte()) << 28;
-          if (tmp < 0) {
-            // Discard upper 32 bits.
-            for (int i = 0; i < 5; i++) {
-              if (is.readByte() >= 0) {
-                return result;
-              }
-            }
-            throw new CorruptedFrameException("Encountered a malformed varint.");
-          }
-        }
-      }
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
deleted file mode 100644
index f9da6f1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageEncoder;
-
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
-
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.WireFormat;
-
-/**
- * Converts an RPCMessage into wire format.
- */
-class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
-  final org.slf4j.Logger logger;
-
-  static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
-  static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
-  static final int RAW_BODY_TAG = makeTag(CompleteRpcMessage.RAW_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
-  static final int HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
-  static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
-  static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
-
-  public RpcEncoder(String name) {
-    this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "-" + name);
-  }
-
-  @Override
-  protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Object> out) throws Exception {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("Rpc Encoder called with msg {}", msg);
-    }
-
-    if (!ctx.channel().isOpen()) {
-      //output.add(ctx.alloc().buffer(0));
-      logger.debug("Channel closed, skipping encode.");
-      msg.release();
-      return;
-    }
-
-    try{
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Encoding outbound message {}", msg);
-      }
-      // first we build the RpcHeader
-      RpcHeader header = RpcHeader.newBuilder() //
-          .setMode(msg.mode) //
-          .setCoordinationId(msg.coordinationId) //
-          .setRpcType(msg.rpcType).build();
-
-      // figure out the full length
-      int headerLength = header.getSerializedSize();
-      int protoBodyLength = msg.pBody.getSerializedSize();
-      int rawBodyLength = msg.getRawBodySize();
-      int fullLength = //
-          HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength +   //
-          PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; //
-
-      if (rawBodyLength > 0) {
-        fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength);
-      }
-
-      ByteBuf buf = ctx.alloc().buffer();
-      OutputStream os = new ByteBufOutputStream(buf);
-      CodedOutputStream cos = CodedOutputStream.newInstance(os);
-
-      // write full length first (this is length delimited stream).
-      cos.writeRawVarint32(fullLength);
-
-      // write header
-      cos.writeRawVarint32(HEADER_TAG);
-      cos.writeRawVarint32(headerLength);
-      header.writeTo(cos);
-
-      // write protobuf body length and body
-      cos.writeRawVarint32(PROTOBUF_BODY_TAG);
-      cos.writeRawVarint32(protoBodyLength);
-      msg.pBody.writeTo(cos);
-
-      // if exists, write data body and tag.
-      if (msg.getRawBodySize() > 0) {
-        if(RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Writing raw body of size {}", msg.getRawBodySize());
-        }
-
-        cos.writeRawVarint32(RAW_BODY_TAG);
-        cos.writeRawVarint32(rawBodyLength);
-        cos.flush(); // need to flush so that dbody goes after if cos is caching.
-
-        CompositeByteBuf cbb = new CompositeByteBuf(buf.alloc(), true, msg.dBodies.length + 1);
-        cbb.addComponent(buf);
-        int bufLength = buf.readableBytes();
-        for (ByteBuf b : msg.dBodies) {
-          cbb.addComponent(b);
-          bufLength += b.readableBytes();
-        }
-        cbb.writerIndex(bufLength);
-        out.add(cbb);
-      } else {
-        cos.flush();
-        out.add(buf);
-      }
-
-      if (RpcConstants.SOME_DEBUGGING) {
-        logger.debug("Wrote message length {}:{} bytes (head:body).  Message: " + msg, getRawVarintSize(fullLength), fullLength);
-      }
-      if (RpcConstants.EXTRA_DEBUGGING) {
-        logger.debug("Sent message.  Ending writer index was {}.", buf.writerIndex());
-      }
-    } finally {
-      // make sure to release Rpc Messages underlying byte buffers.
-      //msg.release();
-    }
-  }
-
-  /** Makes a tag value given a field number and wire type, copied from WireFormat since it isn't public.  */
-  static int makeTag(final int fieldNumber, final int wireType) {
-    return (fieldNumber << 3) | wireType;
-  }
-
-  public static int getRawVarintSize(int value) {
-    int count = 0;
-    while (true) {
-      if ((value & ~0x7F) == 0) {
-        count++;
-        return count;
-      } else {
-        count++;
-        value >>>= 7;
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
deleted file mode 100644
index a6d0e8e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import java.util.concurrent.ExecutionException;
-
-import org.apache.drill.common.exceptions.DrillIOException;
-import org.apache.drill.common.util.DrillStringUtils;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-
-/**
- * Parent class for all rpc exceptions.
- */
-public class RpcException extends DrillIOException{
-  private static final long serialVersionUID = -5964230316010502319L;
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcException.class);
-
-  public RpcException() {
-    super();
-  }
-
-  public RpcException(String message, Throwable cause) {
-    super(format(message), cause);
-  }
-
-  private static String format(String message) {
-    return DrillStringUtils.unescapeJava(message);
-  }
-
-  public RpcException(String message) {
-    super(format(message));
-  }
-
-  public RpcException(Throwable cause) {
-    super(cause);
-  }
-
-  public static RpcException mapException(Throwable t) {
-    while (t instanceof ExecutionException) {
-      t = ((ExecutionException)t).getCause();
-    }
-    if (t instanceof RpcException) {
-      return ((RpcException) t);
-    }
-    return new RpcException(t);
-  }
-
-  public static RpcException mapException(String message, Throwable t) {
-    while (t instanceof ExecutionException) {
-      t = ((ExecutionException)t).getCause();
-    }
-    return new RpcException(message, t);
-  }
-
-  public boolean isRemote(){
-    return false;
-  }
-
-  public DrillPBError getRemoteError(){
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
deleted file mode 100644
index 46b7702..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import org.eclipse.jetty.io.Connection;
-
-public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
-
-  private final C connection;
-
-  public RpcExceptionHandler(C connection){
-    this.connection = connection;
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-    if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
-      logger.warn("Exception occurred with closed channel.  Connection: {}", connection.getName(), cause);
-      return;
-    }else{
-      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", connection.getName(), cause);
-      ctx.close();
-    }
-  }
-
-  @Override
-  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-  }
-
-
-  @Override
-  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
deleted file mode 100644
index 9712c9a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-public abstract class RpcMessage {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcMessage.class);
-
-  public RpcMode mode;
-  public int rpcType;
-  public int coordinationId;
-
-  public RpcMessage(RpcMode mode, int rpcType, int coordinationId) {
-    this.mode = mode;
-    this.rpcType = rpcType;
-    this.coordinationId = coordinationId;
-  }
-
-  public abstract int getBodySize();
-  abstract void release();
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
deleted file mode 100644
index af9aa01..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-public interface RpcOutcome<T> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
-
-  public void set(Object value, ByteBuf buffer);
-  public void setException(Throwable t);
-  public Class<T> getOutcomeType();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
deleted file mode 100644
index 4485cf9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.buffer.ByteBuf;
-
-public interface RpcOutcomeListener<V> {
-
-  /**
-   * Called when an error occurred while waiting for the RPC outcome.
-   * @param ex
-   */
-  void failed(RpcException ex);
-
-
-  void success(V value, ByteBuf buffer);
-
-  /**
-   * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
-   * failures.
-   */
-  void interrupted(final InterruptedException e);
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
deleted file mode 100644
index 34da53c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.ServerSocketChannel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.internal.SystemPropertyUtil;
-
-import java.util.Locale;
-
-import org.apache.drill.exec.ExecConstants;
-
-/**
- * TransportCheck decides whether or not to use the native EPOLL mechanism for communication.
- */
-public class TransportCheck {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransportCheck.class);
-
-  public static final boolean SUPPORTS_EPOLL;
-
-  static{
-
-    String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.US).trim();
-
-    if (name.startsWith("linux") && SystemPropertyUtil.getBoolean(ExecConstants.USE_LINUX_EPOLL, false)) {
-      SUPPORTS_EPOLL = true;
-    } else {
-      SUPPORTS_EPOLL = false;
-    }
-  }
-
-  public static Class<? extends ServerSocketChannel> getServerSocketChannel(){
-    if(SUPPORTS_EPOLL){
-      return EpollServerSocketChannel.class;
-    }else{
-      return NioServerSocketChannel.class;
-    }
-  }
-
-  public static Class<? extends SocketChannel> getClientSocketChannel(){
-    if(SUPPORTS_EPOLL){
-      return EpollSocketChannel.class;
-    }else{
-      return NioSocketChannel.class;
-    }
-  }
-
-  public static EventLoopGroup createEventLoopGroup(int nThreads, String prefix) {
-     if(SUPPORTS_EPOLL){
-       return new EpollEventLoopGroup(nThreads, new NamedThreadFactory(prefix));
-     }else{
-       return new NioEventLoopGroup(nThreads, new NamedThreadFactory(prefix));
-     }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
deleted file mode 100644
index 1d2cc1a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserRpcException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-public class UserRpcException extends RpcException {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcException.class);
-
-  private final String message;
-  private final DrillbitEndpoint endpoint;
-  private final Throwable t;
-
-  public UserRpcException(DrillbitEndpoint endpoint, String message, Throwable t) {
-    super(t);
-    this.message = message;
-    this.endpoint = endpoint;
-    this.t = t;
-  }
-
-  public String getUserMessage() {
-    return message;
-  }
-
-  public DrillbitEndpoint getEndpoint() {
-    return endpoint;
-  }
-
-  public Throwable getUserException() {
-    return t;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
index 74b48b6..347d076 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 
 /**
  * General mechanism for waiting on the query to be executed

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
deleted file mode 100644
index e2ffcc0..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.user;
-
-public interface ConnectionThrottle {
-  public void setAutoRead(boolean enableAutoRead);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 14c7154..16d957b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 84c98b4..824e6eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.Response;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 01a44b8..1a55716 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.user;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 
 public interface UserResultsListener {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index ddbbe7d..0ca8e74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -29,6 +29,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.DrillClient;
@@ -39,7 +40,7 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.vector.ValueVector;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6adbfc0..d609915 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
@@ -434,7 +435,7 @@ public class Foreman implements Runnable {
       final OptionManager optionManager = queryContext.getOptions();
       final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
       long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-          queryContext.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+          queryContext.getConfig().getLong(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC));
       maxAllocPerNode = Math.min(maxAllocPerNode,
           optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
       final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 1a95e77..8737c29 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -45,8 +45,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.rpc.user.UserSession;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 2cd5c95..4b3c008 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 76bc3bf..26b2a4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -28,7 +28,6 @@ import java.util.Properties;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -45,18 +44,19 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.DrillTest;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-public class TestAllocators {
+public class TestAllocators extends DrillTest {
 
   private static final Properties TEST_CONFIGURATIONS = new Properties() {
     {
-      put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "14000000");
-      put(ExecConstants.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
-      put(ExecConstants.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
+      put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "14000000");
+      put(AccountorImpl.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
+      put(AccountorImpl.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
     }
   };
 
@@ -66,8 +66,8 @@ public class TestAllocators {
   public void testTransfer() throws Exception {
     final Properties props = new Properties() {
       {
-        put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "1000000");
-        put(ExecConstants.ERROR_ON_MEMORY_LEAK, "true");
+        put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
+        put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
       }
     };
     final DrillConfig config = DrillConfig.create(props);

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
deleted file mode 100644
index bb1e718..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import static org.junit.Assert.assertEquals;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.common.DrillAutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecTest;
-import org.junit.Test;
-
-
-public class TestEndianess extends ExecTest {
-  @Test
-  public void testLittleEndian() {
-    final DrillConfig drillConfig = DrillConfig.create();
-    final BufferAllocator a = RootAllocatorFactory.newRoot(drillConfig);
-    final ByteBuf b = a.buffer(4);
-    b.setInt(0, 35);
-    assertEquals(b.getByte(0), 35);
-    assertEquals(b.getByte(1), 0);
-    assertEquals(b.getByte(2), 0);
-    assertEquals(b.getByte(3), 0);
-    b.release();
-    DrillAutoCloseables.closeNoChecked(a);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 986bfcd..ae3c97f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -68,9 +68,9 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 9f622f5..4976bca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -32,8 +32,8 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.vector.ValueVector;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
index e1b03d5..7afc05d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.ConnectionThrottle;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;

http://git-wip-us.apache.org/repos/asf/drill/blob/1fde9bb1/exec/memory/base/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/base/pom.xml b/exec/memory/base/pom.xml
new file mode 100644
index 0000000..6bfd29b
--- /dev/null
+++ b/exec/memory/base/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  You under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>memory-parent</artifactId>
+    <groupId>org.apache.drill.memory</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>drill-memory-base</artifactId>
+  <name>exec/memory/base</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-protocol</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>3.0.1</version>
+    </dependency>
+
+
+  </dependencies>
+
+
+  <build>
+  </build>
+
+
+
+</project>


Mime
View raw message