drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [19/29] drill git commit: DRILL-4280: CORE (revert DRILL-3242)
Date Sat, 25 Feb 2017 07:18:12 GMT
DRILL-4280: CORE (revert DRILL-3242)

+ DRILL-3242 aims to provide offloading request handling to a secondary thread, but this feature
is disabled by default due to concurrency issues

+ One of the implications of the feature was to ignore exceptions that were not of UserRpcException
type. But exceptions must not be ignored, they should be handled properly, specially in the
context of security


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0501a673
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0501a673
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0501a673

Branch: refs/heads/master
Commit: 0501a673697f0a968bea96fc1d4de6e18b8c2ca4
Parents: d068b3e
Author: Sudheesh Katkam <sudheesh@apache.org>
Authored: Fri Feb 24 17:18:27 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Fri Feb 24 19:01:42 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 195 +++++--------------
 1 file changed, 54 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0501a673/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 38cba49..aa713f8 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -31,11 +31,9 @@ import java.io.Closeable;
 import java.net.SocketAddress;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.drill.common.SerializedExecutor;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
@@ -58,7 +56,6 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
   private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0,
0, Acks.OK);
-  private static final boolean ENABLE_SEPARATE_THREADS = "true".equals(System.getProperty("drill.enable_rpc_offload"));
 
   protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
 
@@ -179,17 +176,13 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
   private class ResponseSenderImpl implements ResponseSender {
 
-    private RemoteConnection connection;
-    private int coordinationId;
+    private final RemoteConnection connection;
+    private final int coordinationId;
     private final AtomicBoolean sent = new AtomicBoolean(false);
 
-    public ResponseSenderImpl() {
-    }
-
-    void set(RemoteConnection connection, int coordinationId){
+    public ResponseSenderImpl(RemoteConnection connection, int coordinationId) {
       this.connection = connection;
       this.coordinationId = coordinationId;
-      sent.set(false);
     }
 
     public void send(Response r) {
@@ -236,30 +229,31 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
 
   }
 
-  private class SameExecutor implements Executor {
-
-    @Override
-    public void execute(Runnable command) {
-      command.run();
+  private static void retainByteBuf(ByteBuf buf) {
+    if (buf != null) {
+      buf.retain();
     }
+  }
 
+  private static void releaseByteBuf(ByteBuf buf) {
+    if (buf != null) {
+      buf.release();
+    }
   }
 
   protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage>
{
 
-    private final Executor exec;
     private final C connection;
 
     public InboundHandler(C connection) {
       super();
       Preconditions.checkNotNull(connection);
       this.connection = connection;
-      final Executor underlyingExecutor = ENABLE_SEPARATE_THREADS ? rpcConfig.getExecutor()
: new SameExecutor();
-      this.exec = new RpcEventHandler(underlyingExecutor);
     }
 
     @Override
-    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final
List<Object> output) throws Exception {
+    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final
List<Object> output)
+        throws Exception {
       if (!ctx.channel().isOpen()) {
         return;
       }
@@ -269,26 +263,56 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
       final Channel channel = connection.getChannel();
       final Stopwatch watch = Stopwatch.createStarted();
 
-      try{
+      try {
 
         switch (msg.mode) {
-        case REQUEST:
-          RequestEvent reqEvent = new RequestEvent(msg.coordinationId, connection, msg.rpcType,
msg.pBody, msg.dBody);
-          exec.execute(reqEvent);
+        case REQUEST: {
+          final ResponseSenderImpl sender = new ResponseSenderImpl(connection, msg.coordinationId);
+          retainByteBuf(msg.pBody);
+          retainByteBuf(msg.dBody);
+          try {
+            handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+          } catch (UserRpcException e) {
+            sender.sendFailure(e);
+          } finally {
+            releaseByteBuf(msg.pBody);
+            releaseByteBuf(msg.dBody);
+          }
           break;
+        }
 
-        case RESPONSE:
-          ResponseEvent respEvent = new ResponseEvent(connection, msg.rpcType, msg.coordinationId,
msg.pBody, msg.dBody);
-          exec.execute(respEvent);
+        case RESPONSE: {
+          retainByteBuf(msg.pBody);
+          retainByteBuf(msg.dBody);
+          try {
+            final MessageLite defaultResponse = getResponseDefaultInstance(msg.rpcType);
+            assert rpcConfig.checkReceive(msg.rpcType, defaultResponse.getClass());
+            final RpcOutcome<?> rpcFuture = connection.getAndRemoveRpcOutcome(msg.rpcType,
msg.coordinationId,
+                defaultResponse.getClass());
+            final Parser<?> parser = defaultResponse.getParserForType();
+            final Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+            rpcFuture.set(value, msg.dBody);
+            if (RpcConstants.EXTRA_DEBUGGING) {
+              logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+            }
+          } catch (Exception ex) {
+            logger.error("Failure while handling response.", ex);
+            throw ex;
+          } finally {
+            releaseByteBuf(msg.pBody);
+            releaseByteBuf(msg.dBody);
+          }
           break;
+        }
 
-        case RESPONSE_FAILURE:
+        case RESPONSE_FAILURE: {
           DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody,
msg.pBody.readableBytes()));
           connection.recordRemoteFailure(msg.coordinationId, failure);
           if (RpcConstants.EXTRA_DEBUGGING) {
             logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId,
failure);
           }
           break;
+        }
 
         case PING:
           channel.writeAndFlush(PONG);
@@ -319,120 +343,9 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection>
imp
       ByteBufInputStream is = new ByteBufInputStream(pBody);
       return parser.parseFrom(is);
     } catch (InvalidProtocolBufferException e) {
-      throw new RpcException(String.format("Failure while decoding message with parser of
type. %s", parser.getClass().getCanonicalName()), e);
-    }
-  }
-
-  class RpcEventHandler extends SerializedExecutor {
-
-    public RpcEventHandler(Executor underlyingExecutor) {
-      super(rpcConfig.getName() + "-rpc-event-queue", underlyingExecutor);
+      throw new RpcException(
+          String.format("Failure while decoding message with parser of type. %s",
+              parser.getClass().getCanonicalName()), e);
     }
-
-    @Override
-    protected void runException(Runnable command, Throwable t) {
-      logger.error("Failure while running rpc command.", t);
-    }
-
-  }
-
-  private class RequestEvent implements Runnable {
-    private final ResponseSenderImpl sender;
-    private final C connection;
-    private final int rpcType;
-    private final ByteBuf pBody;
-    private final ByteBuf dBody;
-
-    RequestEvent(int coordinationId, C connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
{
-      sender = new ResponseSenderImpl();
-      this.connection = connection;
-      this.rpcType = rpcType;
-      this.pBody = pBody;
-      this.dBody = dBody;
-      sender.set(connection, coordinationId);
-
-      if(pBody != null){
-        pBody.retain();
-      }
-
-      if(dBody != null){
-        dBody.retain();
-      }
-    }
-
-    @Override
-    public void run() {
-      try {
-        handle(connection, rpcType, pBody, dBody, sender);
-      } catch (UserRpcException e) {
-        sender.sendFailure(e);
-      } catch (Exception e) {
-        logger.error("Failure while handling message.", e);
-      }finally{
-        if(pBody != null){
-          pBody.release();
-        }
-
-        if(dBody != null){
-          dBody.release();
-        }
-      }
-
-    }
-
-
-  }
-
-
-  private class ResponseEvent implements Runnable {
-
-    private final int rpcType;
-    private final int coordinationId;
-    private final ByteBuf pBody;
-    private final ByteBuf dBody;
-    private final C connection;
-
-    public ResponseEvent(C connection, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf
dBody) {
-      this.rpcType = rpcType;
-      this.coordinationId = coordinationId;
-      this.pBody = pBody;
-      this.dBody = dBody;
-      this.connection = connection;
-
-      if(pBody != null){
-        pBody.retain();
-      }
-
-      if(dBody != null){
-        dBody.retain();
-      }
-    }
-
-    public void run(){
-      try {
-        MessageLite m = getResponseDefaultInstance(rpcType);
-        assert rpcConfig.checkReceive(rpcType, m.getClass());
-        RpcOutcome<?> rpcFuture = connection.getAndRemoveRpcOutcome(rpcType, coordinationId,
m.getClass());
-        Parser<?> parser = m.getParserForType();
-        Object value = parser.parseFrom(new ByteBufInputStream(pBody, pBody.readableBytes()));
-        rpcFuture.set(value, dBody);
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-        }
-      } catch (Exception ex) {
-        logger.error("Failure while handling response.", ex);
-      }finally{
-        if(pBody != null){
-          pBody.release();
-        }
-
-        if(dBody != null){
-          dBody.release();
-        }
-
-      }
-
-    }
-
   }
 }


Mime
View raw message