Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D993A200C24 for ; Wed, 8 Feb 2017 23:03:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D6CD4160B6D; Wed, 8 Feb 2017 22:03:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF36C160B67 for ; Wed, 8 Feb 2017 23:03:06 +0100 (CET) Received: (qmail 38842 invoked by uid 500); 8 Feb 2017 22:03:06 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 38790 invoked by uid 99); 8 Feb 2017 22:03:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 22:03:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B69B7DFE20; Wed, 8 Feb 2017 22:03:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: danburkert@apache.org To: commits@kudu.apache.org Date: Wed, 08 Feb 2017 22:03:07 -0000 Message-Id: <83a96de4bf0e4eebbe2e66e295370c2e@git.apache.org> In-Reply-To: <48a3668b0c16436c823cfbd241f3ac93@git.apache.org> References: <48a3668b0c16436c823cfbd241f3ac93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] kudu git commit: java: use a netty frame decoder instead of replaying decoder archived-at: Wed, 08 Feb 2017 22:03:08 -0000 java: use a netty frame decoder instead of replaying decoder All of our inbound packets are length-prefixed. Netty provides a nice class for handing length-prefixed messages. This avoids a bunch of more custom logic we were doing for length checking, etc. Change-Id: I1ec1e6f9cbbdf694fe85b7668a3c349efc26c08f Reviewed-on: http://gerrit.cloudera.org:8080/5926 Reviewed-by: Dan Burkert Reviewed-by: Jean-Daniel Cryans Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/24b06245 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/24b06245 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/24b06245 Branch: refs/heads/master Commit: 24b06245090292589df4093c4080bfedb17f76ca Parents: 5f4bd61 Author: Todd Lipcon Authored: Tue Feb 7 01:10:07 2017 -0800 Committer: Todd Lipcon Committed: Wed Feb 8 21:27:48 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/CallResponse.java | 23 ++--- .../org/apache/kudu/client/ConnectionCache.java | 10 ++- .../java/org/apache/kudu/client/KuduRpc.java | 43 +++------ .../org/apache/kudu/client/SecureRpcHelper.java | 3 +- .../org/apache/kudu/client/TabletClient.java | 91 +++++--------------- 5 files changed, 48 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java index 71b1a8f..3b93f60 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java @@ -45,20 +45,15 @@ final class CallResponse { * * Afterwards, this constructs the RpcHeader from the buffer. * @param buf Channel buffer which call response reads from. - * @throws IllegalArgumentException If either the entire recorded packet - * size or recorded response header PB size are not within reasonable - * limits as defined by {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}. - * @throws IndexOutOfBoundsException if the ChannelBuffer does not contain - * the amount of bytes specified by its length prefix. + * @throws IndexOutOfBoundsException if any length prefix inside the + * response points outside the bounds of the buffer. */ public CallResponse(final ChannelBuffer buf) { this.buf = buf; - this.totalResponseSize = buf.readInt(); - KuduRpc.checkArrayLength(buf, this.totalResponseSize); - TabletClient.ensureReadable(buf, this.totalResponseSize); - + this.totalResponseSize = buf.readableBytes(); final int headerSize = Bytes.readVarInt32(buf); + // No needs to bounds-check the size since 'buf' is already sized appropriately. final Slice headerSlice = nextBytes(buf, headerSize); RpcHeader.ResponseHeader.Builder builder = RpcHeader.ResponseHeader.newBuilder(); KuduRpc.readProtobuf(headerSlice, builder); @@ -82,9 +77,6 @@ final class CallResponse { /** * @return A slice pointing to the section of the packet reserved for the main * protobuf message. - * @throws IllegalArgumentException If the recorded size for the main message - * is not within reasonable limits as defined by - * {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}. * @throws IllegalStateException If the offset for the main protobuf message * is not valid. */ @@ -107,9 +99,6 @@ final class CallResponse { * header response PB are not valid offsets for the array. * @throws IllegalArgumentException If the sidecar with the specified index * does not exist. - * @throws IllegalArgumentException If the recorded size for the main message - * is not within reasonable limits as defined by - * {@link KuduRpc#checkArrayLength(ChannelBuffer, long)}. */ public Slice getSidecar(int sidecar) { cacheMessage(); @@ -149,14 +138,14 @@ final class CallResponse { } // After checking the length, generates a slice for the next 'length' - // bytes of 'buf'. + // bytes of 'buf'. Advances the buffer's read index by 'length' bytes. private static Slice nextBytes(final ChannelBuffer buf, final int length) { - KuduRpc.checkArrayLength(buf, length); byte[] payload; int offset; if (buf.hasArray()) { // Zero copy. payload = buf.array(); offset = buf.arrayOffset() + buf.readerIndex(); + buf.skipBytes(length); } else { // We have to copy the entire payload out of the buffer :( payload = new byte[length]; buf.readBytes(payload); http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index 1115aed..da2725e 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -27,15 +27,18 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.net.HostAndPort; import com.stumbleupon.async.Deferred; + import org.jboss.netty.channel.DefaultChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannelConfig; +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; import org.jboss.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -245,8 +248,13 @@ class ConnectionCache { } private final class TabletClientPipeline extends DefaultChannelPipeline { - TabletClient init(ServerInfo serverInfo) { + super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder( + KuduRpc.MAX_RPC_SIZE, + 0, // length comes at offset 0 + 4, // length prefix is 4 bytes long + 0, // no "length adjustment" + 4 /* strip the length prefix */)); AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; final TabletClient client = new TabletClient(kuduClient, serverInfo); if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java index 0f50627..ea19f2b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java @@ -70,6 +70,16 @@ public abstract class KuduRpc { @VisibleForTesting public static final int MAX_TRACES_SIZE = 100; + /** + * Upper bound on the size of a byte array we de-serialize. + * This is to prevent Kudu from OOM'ing us, should there be a bug or + * undetected corruption of an RPC on the network, which would turn a + * an innocuous RPC into something allocating a ton of memory. + * The Hadoop RPC protocol doesn't do any checksumming as they probably + * assumed that TCP checksums would be sufficient (they're not). + */ + static final int MAX_RPC_SIZE = 256 * 1024 * 1024; // 256MB + // Service names. protected static final String MASTER_SERVICE_NAME = "kudu.master.MasterService"; protected static final String TABLET_SERVER_SERVICE_NAME = "kudu.tserver.TabletServerService"; @@ -388,37 +398,4 @@ public abstract class KuduRpc { chanBuf.writerIndex(buf.length); return chanBuf; } - - /** - * Upper bound on the size of a byte array we de-serialize. - * This is to prevent Kudu from OOM'ing us, should there be a bug or - * undetected corruption of an RPC on the network, which would turn a - * an innocuous RPC into something allocating a ton of memory. - * The Hadoop RPC protocol doesn't do any checksumming as they probably - * assumed that TCP checksums would be sufficient (they're not). - */ - static final long MAX_BYTE_ARRAY_MASK = - 0xFFFFFFFFF0000000L; // => max = 256MB - - /** - * Verifies that the given length looks like a reasonable array length. - * This method accepts 0 as a valid length. - * @param buf The buffer from which the length was read. - * @param length The length to validate. - * @throws IllegalArgumentException if the length is negative or - * suspiciously large. - */ - static void checkArrayLength(final ChannelBuffer buf, final long length) { - // 2 checks in 1. If any of the high bits are set, we know the value is - // either too large, or is negative (if the most-significant bit is set). - if ((length & MAX_BYTE_ARRAY_MASK) != 0) { - if (length < 0) { - throw new IllegalArgumentException("Read negative byte array length: " + - length + " in buf=" + buf); - } else { - throw new IllegalArgumentException("Read byte array length that's too" + - " large: " + length + " > " + ~MAX_BYTE_ARRAY_MASK + " in buf=" + buf); - } - } - } } http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java index a0cac5a..8c7cdee 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java @@ -110,6 +110,7 @@ public class SecureRpcHelper { } private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) { + Preconditions.checkNotNull(channel); RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder(); builder.setCallId(SASL_CALL_ID); RpcHeader.RequestHeader header = builder.build(); @@ -155,7 +156,7 @@ public class SecureRpcHelper { private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws SaslException { - + Preconditions.checkNotNull(chan); // Store the supported features advertised by the server. ImmutableSet.Builder features = ImmutableSet.builder(); for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) { http://git-wip-us.apache.org/repos/asf/kudu/blob/24b06245/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java index 592aaff..f17e5a1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java @@ -32,11 +32,13 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; + import javax.security.auth.Subject; import javax.security.sasl.SaslException; import com.google.common.annotations.VisibleForTesting; import com.stumbleupon.async.Deferred; + import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; @@ -47,8 +49,8 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.handler.codec.replay.ReplayingDecoder; -import org.jboss.netty.handler.codec.replay.VoidEnum; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,15 +82,15 @@ import org.apache.kudu.util.Pair; * channel isn't connected. */ @InterfaceAudience.Private -public class TabletClient extends ReplayingDecoder { +public class TabletClient extends SimpleChannelUpstreamHandler { public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class); private ArrayList> pendingRpcs; public static final byte RPC_CURRENT_VERSION = 9; - /** Initial part of the header for 0.95 and up. */ - private static final byte[] RPC_HEADER = new byte[] { 'h', 'r', 'p', 'c', + /** Initial header sent by the client upon connection establishment */ + private static final byte[] CONNECTION_HEADER = new byte[] { 'h', 'r', 'p', 'c', RPC_CURRENT_VERSION, // RPC version. 0, 0 @@ -170,6 +172,7 @@ public class TabletClient extends ReplayingDecoder { Status statusNotSupported = Status.NotSupported("the server does not support the" + "APPLICATION_FEATURE_FLAGS RPC feature"); rpc.errback(new NonRecoverableException(statusNotSupported)); + // TODO(todd): this should return here. We seem to lack test coverage! } encodedRpcAndId = encode(rpc); @@ -374,8 +377,14 @@ public class TabletClient extends ReplayingDecoder { */ @Override @SuppressWarnings("unchecked") - protected Object decode(ChannelHandlerContext ctx, Channel chan, ChannelBuffer buf, - VoidEnum voidEnum) throws NonRecoverableException { + public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception { + Object m = evt.getMessage(); + if (!(m instanceof ChannelBuffer)) { + ctx.sendUpstream(evt); + return; + } + Channel chan = ctx.getChannel(); + ChannelBuffer buf = (ChannelBuffer)m; final long start = System.nanoTime(); final int rdx = buf.readerIndex(); LOG.debug("------------------>> ENTERING DECODE >>------------------"); @@ -389,7 +398,7 @@ public class TabletClient extends ReplayingDecoder { throw new NonRecoverableException(statusIOE, e); } if (buf == null) { - return null; + return; } CallResponse response = new CallResponse(buf); @@ -457,7 +466,6 @@ public class TabletClient extends ReplayingDecoder { if (LOG.isDebugEnabled()) { LOG.debug(getPeerUuidLoggingString() + "rpcid=" + rpcid + ", response size=" + (buf.readerIndex() - rdx) + " bytes" + - ", " + actualReadableBytes() + " readable bytes left" + ", rpc=" + rpc); } @@ -474,7 +482,7 @@ public class TabletClient extends ReplayingDecoder { if (!retryableHeaderError.ok()) { rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build()); kuduClient.handleRetryableError(rpc, new RecoverableException(retryableHeaderError)); - return null; + return; } // We can get this Message from within the RPC's expected type, @@ -486,7 +494,7 @@ public class TabletClient extends ReplayingDecoder { exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder); if (exception == null) { // It was taken care of. - return null; + return; } else { // We're going to errback. decoded = null; @@ -497,7 +505,7 @@ public class TabletClient extends ReplayingDecoder { exception = dispatchMasterErrorOrReturnException(rpc, error, traceBuilder); if (exception == null) { // Exception was taken care of. - return null; + return; } else { decoded = null; } @@ -527,7 +535,6 @@ public class TabletClient extends ReplayingDecoder { LOG.debug("------------------<< LEAVING DECODE <<------------------" + " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us"); } - return null; // Stop processing here. The Deferred does everything else. } /** @@ -590,42 +597,6 @@ public class TabletClient extends ReplayingDecoder { } /** - * Decodes the response of an RPC and triggers its {@link Deferred}. - *

- * This method is used by FrameDecoder when the channel gets - * disconnected. The buffer for that channel is passed to this method in - * case there's anything left in it. - * @param ctx Unused. - * @param chan The channel on which the response came. - * @param buf The buffer containing the raw RPC response. - * @return {@code null}, always. - */ - @Override - protected Object decodeLast(final ChannelHandlerContext ctx, - final Channel chan, - final ChannelBuffer buf, - final VoidEnum unused) throws NonRecoverableException { - // When we disconnect, decodeLast is called instead of decode. - // We simply check whether there's any data left in the buffer, in which - // case we attempt to process it. But if there's no data left, then we - // don't even bother calling decode() as it'll complain that the buffer - // doesn't contain enough data, which unnecessarily pollutes the logs. - if (buf.readable()) { - try { - return decode(ctx, chan, buf, unused); - } finally { - if (buf.readable()) { - LOG.error(getPeerUuidLoggingString() + "After decoding the last message on " + chan + - ", there was still some undecoded bytes in the channel's" + - " buffer (which are going to be lost)"); - } - } - } else { - return null; - } - } - - /** * Tells whether or not this handler should be used. *

* @return true if this instance can be used, else false if this handler is known to have been @@ -638,27 +609,11 @@ public class TabletClient extends ReplayingDecoder { } } - /** - * Ensures that at least a {@code nbytes} are readable from the given buffer. - * If there aren't enough bytes in the buffer this will raise an exception - * and cause the {@link ReplayingDecoder} to undo whatever we did thus far - * so we can wait until we read more from the socket. - * @param buf Buffer to check. - * @param nbytes Number of bytes desired. - */ - static void ensureReadable(final ChannelBuffer buf, final int nbytes) { - buf.markReaderIndex(); - buf.skipBytes(nbytes); // can puke with Throwable - buf.resetReaderIndex(); - } - @Override public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) { final Channel chan = e.getChannel(); - ChannelBuffer header = connectionHeaderPreamble(); - header.writerIndex(RPC_HEADER.length); - Channels.write(chan, header); + Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER)); secureRpcHelper = new SecureRpcHelper(this); secureRpcHelper.sendHello(chan); @@ -801,10 +756,6 @@ public class TabletClient extends ReplayingDecoder { } - private ChannelBuffer connectionHeaderPreamble() { - return ChannelBuffers.wrappedBuffer(RPC_HEADER); - } - /** * Sends the queued RPCs to the server, once we're connected to it. * This gets called after {@link #channelConnected}, once we were able to