kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [3/7] kudu git commit: java: use a netty frame decoder instead of replaying decoder
Date Wed, 08 Feb 2017 22:03:07 GMT
java: use a netty frame decoder instead of replaying decoder

All of our inbound packets are length-prefixed. Netty provides a nice
class for handing length-prefixed messages. This avoids a bunch of more
custom logic we were doing for length checking, etc.

Change-Id: I1ec1e6f9cbbdf694fe85b7668a3c349efc26c08f
Reviewed-on: http://gerrit.cloudera.org:8080/5926
Reviewed-by: Dan Burkert <danburkert@apache.org>
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/24b06245
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/24b06245
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/24b06245

Branch: refs/heads/master
Commit: 24b06245090292589df4093c4080bfedb17f76ca
Parents: 5f4bd61
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Feb 7 01:10:07 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Feb 8 21:27:48 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/CallResponse.java    | 23 ++---
 .../org/apache/kudu/client/ConnectionCache.java | 10 ++-
 .../java/org/apache/kudu/client/KuduRpc.java    | 43 +++------
 .../org/apache/kudu/client/SecureRpcHelper.java |  3 +-
 .../org/apache/kudu/client/TabletClient.java    | 91 +++++---------------
 5 files changed, 48 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index 71b1a8f..3b93f60 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -45,20 +45,15 @@ final class CallResponse {
    *
    * Afterwards, this constructs the RpcHeader from the buffer.
    * @param buf Channel buffer which call response reads from.
-   * @throws IllegalArgumentException If either the entire recorded packet
-   * size or recorded response header PB size are not within reasonable
-   * limits as defined by {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}.
-   * @throws IndexOutOfBoundsException if the ChannelBuffer does not contain
-   * the amount of bytes specified by its length prefix.
+   * @throws IndexOutOfBoundsException if any length prefix inside the
+   * response points outside the bounds of the buffer.
    */
   public CallResponse(final ChannelBuffer buf) {
     this.buf = buf;
 
-    this.totalResponseSize = buf.readInt();
-    KuduRpc.checkArrayLength(buf, this.totalResponseSize);
-    TabletClient.ensureReadable(buf, this.totalResponseSize);
-
+    this.totalResponseSize = buf.readableBytes();
     final int headerSize = Bytes.readVarInt32(buf);
+    // No needs to bounds-check the size since 'buf' is already sized appropriately.
     final Slice headerSlice = nextBytes(buf, headerSize);
     RpcHeader.ResponseHeader.Builder builder = RpcHeader.ResponseHeader.newBuilder();
     KuduRpc.readProtobuf(headerSlice, builder);
@@ -82,9 +77,6 @@ final class CallResponse {
   /**
    * @return A slice pointing to the section of the packet reserved for the main
    * protobuf message.
-   * @throws IllegalArgumentException If the recorded size for the main message
-   * is not within reasonable limits as defined by
-   * {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}.
    * @throws IllegalStateException If the offset for the main protobuf message
    * is not valid.
    */
@@ -107,9 +99,6 @@ final class CallResponse {
    * header response PB are not valid offsets for the array.
    * @throws IllegalArgumentException If the sidecar with the specified index
    * does not exist.
-   * @throws IllegalArgumentException If the recorded size for the main message
-   * is not within reasonable limits as defined by
-   * {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}.
    */
   public Slice getSidecar(int sidecar) {
     cacheMessage();
@@ -149,14 +138,14 @@ final class CallResponse {
   }
 
   // After checking the length, generates a slice for the next 'length'
-  // bytes of 'buf'.
+  // bytes of 'buf'. Advances the buffer's read index by 'length' bytes.
   private static Slice nextBytes(final ChannelBuffer buf, final int length) {
-    KuduRpc.checkArrayLength(buf, length);
     byte[] payload;
     int offset;
     if (buf.hasArray()) {  // Zero copy.
       payload = buf.array();
       offset = buf.arrayOffset() + buf.readerIndex();
+      buf.skipBytes(length);
     } else {  // We have to copy the entire payload out of the buffer :(
       payload = new byte[length];
       buf.readBytes(payload);

http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 1115aed..da2725e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -27,15 +27,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.net.HostAndPort;
 import com.stumbleupon.async.Deferred;
+
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -245,8 +248,13 @@ class ConnectionCache {
   }
 
   private final class TabletClientPipeline extends DefaultChannelPipeline {
-
     TabletClient init(ServerInfo serverInfo) {
+      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+          KuduRpc.MAX_RPC_SIZE,
+          0, // length comes at offset 0
+          4, // length prefix is 4 bytes long
+          0, // no "length adjustment"
+          4 /* strip the length prefix */));
       AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
       final TabletClient client = new TabletClient(kuduClient, serverInfo);
       if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 0f50627..ea19f2b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -70,6 +70,16 @@ public abstract class KuduRpc<R> {
   @VisibleForTesting
   public static final int MAX_TRACES_SIZE = 100;
 
+  /**
+   * Upper bound on the size of a byte array we de-serialize.
+   * This is to prevent Kudu from OOM'ing us, should there be a bug or
+   * undetected corruption of an RPC on the network, which would turn a
+   * an innocuous RPC into something allocating a ton of memory.
+   * The Hadoop RPC protocol doesn't do any checksumming as they probably
+   * assumed that TCP checksums would be sufficient (they're not).
+   */
+  static final int MAX_RPC_SIZE = 256 * 1024 * 1024; // 256MB
+
   // Service names.
   protected static final String MASTER_SERVICE_NAME = "kudu.master.MasterService";
   protected static final String TABLET_SERVER_SERVICE_NAME = "kudu.tserver.TabletServerService";
@@ -388,37 +398,4 @@ public abstract class KuduRpc<R> {
     chanBuf.writerIndex(buf.length);
     return chanBuf;
   }
-
-  /**
-   * Upper bound on the size of a byte array we de-serialize.
-   * This is to prevent Kudu from OOM'ing us, should there be a bug or
-   * undetected corruption of an RPC on the network, which would turn a
-   * an innocuous RPC into something allocating a ton of memory.
-   * The Hadoop RPC protocol doesn't do any checksumming as they probably
-   * assumed that TCP checksums would be sufficient (they're not).
-   */
-  static final long MAX_BYTE_ARRAY_MASK =
-      0xFFFFFFFFF0000000L;  // => max = 256MB
-
-  /**
-   * Verifies that the given length looks like a reasonable array length.
-   * This method accepts 0 as a valid length.
-   * @param buf The buffer from which the length was read.
-   * @param length The length to validate.
-   * @throws IllegalArgumentException if the length is negative or
-   * suspiciously large.
-   */
-  static void checkArrayLength(final ChannelBuffer buf, final long length) {
-    // 2 checks in 1.  If any of the high bits are set, we know the value is
-    // either too large, or is negative (if the most-significant bit is set).
-    if ((length & MAX_BYTE_ARRAY_MASK) != 0) {
-      if (length < 0) {
-        throw new IllegalArgumentException("Read negative byte array length: " +
-            length + " in buf=" + buf);
-      } else {
-        throw new IllegalArgumentException("Read byte array length that's too" +
-            " large: " + length + " > " + ~MAX_BYTE_ARRAY_MASK + " in buf=" + buf);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
index a0cac5a..8c7cdee 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
@@ -110,6 +110,7 @@ public class SecureRpcHelper {
   }
 
   private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
+    Preconditions.checkNotNull(channel);
     RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder();
     builder.setCallId(SASL_CALL_ID);
     RpcHeader.RequestHeader header = builder.build();
@@ -155,7 +156,7 @@ public class SecureRpcHelper {
 
   private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
       SaslException {
-
+    Preconditions.checkNotNull(chan);
     // Store the supported features advertised by the server.
     ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
     for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 592aaff..f17e5a1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -32,11 +32,13 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.stumbleupon.async.Deferred;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -47,8 +49,8 @@ import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
-import org.jboss.netty.handler.codec.replay.VoidEnum;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,15 +82,15 @@ import org.apache.kudu.util.Pair;
  * channel isn't connected.
  */
 @InterfaceAudience.Private
-public class TabletClient extends ReplayingDecoder<VoidEnum> {
+public class TabletClient extends SimpleChannelUpstreamHandler {
 
   public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
 
   private ArrayList<KuduRpc<?>> pendingRpcs;
 
   public static final byte RPC_CURRENT_VERSION = 9;
-  /** Initial part of the header for 0.95 and up.  */
-  private static final byte[] RPC_HEADER = new byte[] { 'h', 'r', 'p', 'c',
+  /** Initial header sent by the client upon connection establishment */
+  private static final byte[] CONNECTION_HEADER = new byte[] { 'h', 'r', 'p', 'c',
       RPC_CURRENT_VERSION,     // RPC version.
       0,
       0
@@ -170,6 +172,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         Status statusNotSupported = Status.NotSupported("the server does not support the"
+
             "APPLICATION_FEATURE_FLAGS RPC feature");
         rpc.errback(new NonRecoverableException(statusNotSupported));
+        // TODO(todd): this should return here. We seem to lack test coverage!
       }
 
       encodedRpcAndId = encode(rpc);
@@ -374,8 +377,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
    */
   @Override
   @SuppressWarnings("unchecked")
-  protected Object decode(ChannelHandlerContext ctx, Channel chan, ChannelBuffer buf,
-                              VoidEnum voidEnum) throws NonRecoverableException {
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception
{
+    Object m = evt.getMessage();
+    if (!(m instanceof ChannelBuffer)) {
+      ctx.sendUpstream(evt);
+      return;
+    }
+    Channel chan = ctx.getChannel();
+    ChannelBuffer buf = (ChannelBuffer)m;
     final long start = System.nanoTime();
     final int rdx = buf.readerIndex();
     LOG.debug("------------------>> ENTERING DECODE >>------------------");
@@ -389,7 +398,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       throw new NonRecoverableException(statusIOE, e);
     }
     if (buf == null) {
-      return null;
+      return;
     }
 
     CallResponse response = new CallResponse(buf);
@@ -457,7 +466,6 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getPeerUuidLoggingString() + "rpcid=" + rpcid +
           ", response size=" + (buf.readerIndex() - rdx) + " bytes" +
-          ", " + actualReadableBytes() + " readable bytes left" +
           ", rpc=" + rpc);
     }
 
@@ -474,7 +482,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     if (!retryableHeaderError.ok()) {
       rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build());
       kuduClient.handleRetryableError(rpc, new RecoverableException(retryableHeaderError));
-      return null;
+      return;
     }
 
     // We can get this Message from within the RPC's expected type,
@@ -486,7 +494,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder);
         if (exception == null) {
           // It was taken care of.
-          return null;
+          return;
         } else {
           // We're going to errback.
           decoded = null;
@@ -497,7 +505,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         exception = dispatchMasterErrorOrReturnException(rpc, error, traceBuilder);
         if (exception == null) {
           // Exception was taken care of.
-          return null;
+          return;
         } else {
           decoded = null;
         }
@@ -527,7 +535,6 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
       LOG.debug("------------------<< LEAVING  DECODE <<------------------" +
           " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
     }
-    return null;  // Stop processing here.  The Deferred does everything else.
   }
 
   /**
@@ -590,42 +597,6 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   }
 
   /**
-   * Decodes the response of an RPC and triggers its {@link Deferred}.
-   * <p>
-   * This method is used by FrameDecoder when the channel gets
-   * disconnected.  The buffer for that channel is passed to this method in
-   * case there's anything left in it.
-   * @param ctx Unused.
-   * @param chan The channel on which the response came.
-   * @param buf The buffer containing the raw RPC response.
-   * @return {@code null}, always.
-   */
-  @Override
-  protected Object decodeLast(final ChannelHandlerContext ctx,
-                              final Channel chan,
-                              final ChannelBuffer buf,
-                              final VoidEnum unused) throws NonRecoverableException {
-    // When we disconnect, decodeLast is called instead of decode.
-    // We simply check whether there's any data left in the buffer, in which
-    // case we attempt to process it.  But if there's no data left, then we
-    // don't even bother calling decode() as it'll complain that the buffer
-    // doesn't contain enough data, which unnecessarily pollutes the logs.
-    if (buf.readable()) {
-      try {
-        return decode(ctx, chan, buf, unused);
-      } finally {
-        if (buf.readable()) {
-          LOG.error(getPeerUuidLoggingString() + "After decoding the last message on " +
chan +
-              ", there was still some undecoded bytes in the channel's" +
-              " buffer (which are going to be lost)");
-        }
-      }
-    } else {
-      return null;
-    }
-  }
-
-  /**
    * Tells whether or not this handler should be used.
    * <p>
    * @return true if this instance can be used, else false if this handler is known to have
been
@@ -638,27 +609,11 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
     }
   }
 
-  /**
-   * Ensures that at least a {@code nbytes} are readable from the given buffer.
-   * If there aren't enough bytes in the buffer this will raise an exception
-   * and cause the {@link ReplayingDecoder} to undo whatever we did thus far
-   * so we can wait until we read more from the socket.
-   * @param buf Buffer to check.
-   * @param nbytes Number of bytes desired.
-   */
-  static void ensureReadable(final ChannelBuffer buf, final int nbytes) {
-    buf.markReaderIndex();
-    buf.skipBytes(nbytes); // can puke with Throwable
-    buf.resetReaderIndex();
-  }
-
   @Override
   public void channelConnected(final ChannelHandlerContext ctx,
                                final ChannelStateEvent e) {
     final Channel chan = e.getChannel();
-    ChannelBuffer header = connectionHeaderPreamble();
-    header.writerIndex(RPC_HEADER.length);
-    Channels.write(chan, header);
+    Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
 
     secureRpcHelper = new SecureRpcHelper(this);
     secureRpcHelper.sendHello(chan);
@@ -801,10 +756,6 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   }
 
 
-  private ChannelBuffer connectionHeaderPreamble() {
-    return ChannelBuffers.wrappedBuffer(RPC_HEADER);
-  }
-
   /**
    * Sends the queued RPCs to the server, once we're connected to it.
    * This gets called after {@link #channelConnected}, once we were able to


Mime
View raw message