kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] kudu git commit: java: use an outbound Encoder, add a test for Negotiator
Date Thu, 09 Feb 2017 08:20:15 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a806ce003 -> 0430ea630


java: use an outbound Encoder, add a test for Negotiator

This adds an Encoder to the front of the Netty pipeline, so that we can
send 'RpcOutboundMessage' objects instead of raw ChannelBuffers. This
doesn't really have any major effect, except that it makes testing a bit
easier -- we can stick parts of the pipeline in an Embedder and more
easily inspect the messages they are sending.

As proof of this, this patch adds a basic test for the Negotiator.

Change-Id: I75419b8c38dea9226ccac6a49ea00e17c74038f6
Reviewed-on: http://gerrit.cloudera.org:8080/5947
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/88a6e140
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/88a6e140
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/88a6e140

Branch: refs/heads/master
Commit: 88a6e1402db1e980dce964396a44a0c62752c60e
Parents: a806ce0
Author: Todd Lipcon <todd@apache.org>
Authored: Wed Feb 8 10:30:24 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Feb 9 03:40:01 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/CallResponse.java    |   2 +-
 .../org/apache/kudu/client/ConnectionCache.java |   3 +-
 .../java/org/apache/kudu/client/KuduRpc.java    |   2 +
 .../java/org/apache/kudu/client/Negotiator.java |  13 +--
 .../apache/kudu/client/RpcOutboundMessage.java  |  65 +++++++++++
 .../org/apache/kudu/client/TestNegotiator.java  | 107 +++++++++++++++++++
 6 files changed, 182 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index 3a9a26f..17007c8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -53,7 +53,7 @@ final class CallResponse {
    * @throws IndexOutOfBoundsException if any length prefix inside the
    * response points outside the bounds of the buffer.
    */
-  private CallResponse(final ChannelBuffer buf) {
+  CallResponse(final ChannelBuffer buf) {
     this.buf = buf;
 
     this.totalResponseSize = buf.readableBytes();

http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index aa47e5d..3c0a055 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -255,7 +255,8 @@ class ConnectionCache {
           4, // length prefix is 4 bytes long
           0, // no "length adjustment"
           4 /* strip the length prefix */));
-      super.addLast("decode-responses", new CallResponse.Decoder());
+      super.addLast("decode-inbound", new CallResponse.Decoder());
+      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
       AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
       final TabletClient client = new TabletClient(kuduClient, serverInfo);
       if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index ea19f2b..387a6fa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -378,6 +378,8 @@ public abstract class KuduRpc<R> {
     }
   }
 
+  // TODO(todd): make this private and have all RPCs send RpcOutboundMessage
+  // instances instead of ChannelBuffers
   static ChannelBuffer toChannelBuffer(Message header, Message pb) {
     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, pb);
     byte[] buf = new byte[totalSize + 4];

http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 307145c..11cd8db 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -49,7 +49,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.Channels;
@@ -86,8 +85,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   private boolean finished;
   private SaslClient saslClient;
-  public static final int CONNECTION_CTX_CALL_ID = -3;
-  private static final int SASL_CALL_ID = -33;
+  static final int CONNECTION_CTX_CALL_ID = -3;
+  static final int SASL_CALL_ID = -33;
   private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
       ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
   private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
@@ -121,9 +120,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder();
     builder.setCallId(SASL_CALL_ID);
     RpcHeader.RequestHeader header = builder.build();
-
-    ChannelBuffer buffer = KuduRpc.toChannelBuffer(header, msg);
-    Channels.write(channel, buffer);
+    Channels.write(channel, new RpcOutboundMessage(header, msg));
   }
 
   @Override
@@ -251,7 +248,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     Channels.fireMessageReceived(chan, new Result(serverFeatures));
   }
 
-  private ChannelBuffer makeConnectionContext() {
+  private RpcOutboundMessage makeConnectionContext() {
     RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder();
 
     // The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1.
@@ -262,7 +259,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     RpcHeader.ConnectionContextPB pb = builder.build();
     RpcHeader.RequestHeader header =
         RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
-    return KuduRpc.toChannelBuffer(header, pb);
+    return new RpcOutboundMessage(header, pb);
   }
 
   private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {

http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
new file mode 100644
index 0000000..fb96185
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.protobuf.Message;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import org.apache.kudu.rpc.RpcHeader.RequestHeader;
+
+/**
+ * An RPC header and associated body protobuf which can be sent outbound
+ * through the Netty pipeline. The 'Encoder' inner class is responsible
+ * for serializing these instances into wire-format-compatible buffers.
+ */
+class RpcOutboundMessage {
+  private final RequestHeader header;
+  private final Message body;
+
+  RpcOutboundMessage(RequestHeader header, Message body) {
+    this.header = header;
+    this.body = body;
+  }
+
+  public RequestHeader getHeader() {
+    return header;
+  }
+
+  public Message getBody() {
+    return body;
+  }
+
+  /**
+   * Netty encoder implementation to serialize outbound messages.
+   */
+  static class Encoder extends OneToOneEncoder {
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel chan,
+        Object obj) throws Exception {
+      if (!(obj instanceof RpcOutboundMessage)) {
+        return obj;
+      }
+      RpcOutboundMessage msg = (RpcOutboundMessage)obj;
+      // TODO(todd): move this impl into this class and remove external
+      // callers.
+      return KuduRpc.toChannelBuffer(msg.getHeader(), msg.getBody());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/88a6e140/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
new file mode 100644
index 0000000..1fc6ed9
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.junit.Assert.*;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+
+import javax.security.auth.Subject;
+
+import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
+import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.Message;
+
+public class TestNegotiator {
+  private Negotiator negotiator;
+  private DecoderEmbedder<Object> embedder;
+
+  @Before
+  public void setup() {
+    AccessControlContext context = AccessController.getContext();
+    Subject subject = Subject.getSubject(context);
+    negotiator = new Negotiator(subject, "127.0.0.1");
+    embedder = new DecoderEmbedder<Object>(negotiator);
+  }
+
+  static CallResponse fakeResponse(ResponseHeader header, Message body) {
+    ChannelBuffer buf = KuduRpc.toChannelBuffer(header, body);
+    buf = buf.slice(4, buf.readableBytes() - 4);
+    return new CallResponse(buf);
+  }
+
+  /**
+   * Simple test case for a PLAIN negotiation.
+   */
+  @Test
+  public void testNegotiation() {
+    negotiator.sendHello(embedder.getPipeline().getChannel());
+
+    // Expect client->server: NEGOTIATE.
+    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    NegotiatePB body = (NegotiatePB) msg.getBody();
+    assertEquals(Negotiator.SASL_CALL_ID, msg.getHeader().getCallId());
+    assertEquals(NegotiateStep.NEGOTIATE, ((NegotiatePB)msg.getBody()).getStep());
+
+    // Respond with NEGOTIATE.
+    embedder.offer(fakeResponse(
+        ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
+        NegotiatePB.newBuilder()
+          .addSaslMechanisms(SaslMechanism.newBuilder().setMechanism("PLAIN"))
+          .setStep(NegotiateStep.NEGOTIATE)
+          .build()));
+
+    // Expect client->server: SASL_INITIATE (PLAIN)
+    msg = (RpcOutboundMessage)embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+
+    assertEquals(Negotiator.SASL_CALL_ID, msg.getHeader().getCallId());
+    assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
+    assertEquals(1, body.getSaslMechanismsCount());
+    assertEquals("PLAIN", body.getSaslMechanisms(0).getMechanism());
+    assertTrue(body.hasToken());
+
+    // Respond with SASL_SUCCESS:
+    embedder.offer(fakeResponse(
+        ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
+        NegotiatePB.newBuilder()
+          .setStep(NegotiateStep.SASL_SUCCESS)
+          .build()));
+
+    // Expect client->server: ConnectionContext
+    msg = (RpcOutboundMessage)embedder.poll();
+    ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
+    assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, msg.getHeader().getCallId());
+    assertEquals("java_client", connCtx.getDEPRECATEDUserInfo().getRealUser());
+
+    // Expect the client to also emit a negotiation Result.
+    Result result = (Result)embedder.poll();
+    assertNotNull(result);
+  }
+
+}


Mime
View raw message