Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B87F018401 for ; Sun, 13 Dec 2015 07:27:26 +0000 (UTC) Received: (qmail 7832 invoked by uid 500); 13 Dec 2015 07:27:20 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 7651 invoked by uid 500); 13 Dec 2015 07:27:20 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 6831 invoked by uid 99); 13 Dec 2015 07:27:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Dec 2015 07:27:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43A69E050C; Sun, 13 Dec 2015 07:27:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Sun, 13 Dec 2015 07:27:34 -0000 Message-Id: In-Reply-To: <86a0ec04f28349c7a2a7344cda610774@git.apache.org> References: <86a0ec04f28349c7a2a7344cda610774@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/37] hadoop git commit: HADOOP-10729. Add tests for PB RPC in case version mismatch of client and server. Contributed by Junping Du. HADOOP-10729. Add tests for PB RPC in case version mismatch of client and server. Contributed by Junping Du. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4084d9b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4084d9b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4084d9b Branch: refs/heads/yarn-2877 Commit: c4084d9bc3b5c20405d9da6623b330d5720b64a1 Parents: 832b3cb Author: Haohui Mai Authored: Wed Dec 9 05:41:44 2015 +0800 Committer: Haohui Mai Committed: Wed Dec 9 05:41:44 2015 +0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../ipc/TestProtoBufRPCCompatibility.java | 178 +++++++++++++++++++ .../hadoop-common/src/test/proto/test.proto | 8 + .../src/test/proto/test_rpc_service.proto | 15 ++ 4 files changed, 204 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4084d9b/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e7da77b..4cd295e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1497,6 +1497,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12609. Fix intermittent failure of TestDecayRpcScheduler. (Masatake Iwasaki via Arpit Agarwal) + HADOOP-10729. Add tests for PB RPC in case version mismatch of client and + server. (Junping Du via wheat9) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4084d9b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRPCCompatibility.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRPCCompatibility.java new file mode 100644 index 0000000..76a93cf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRPCCompatibility.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; + +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.OptRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.OptResponseProto; + +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.OldProtobufRpcProto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.NewProtobufRpcProto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.NewerProtobufRpcProto; +import org.apache.hadoop.net.NetUtils; +import org.junit.Assert; +import org.junit.Test; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class TestProtoBufRPCCompatibility { + + private static final String ADDRESS = "0.0.0.0"; + public final static int PORT = 0; + private static InetSocketAddress addr; + private static RPC.Server server; + private static Configuration conf; + + @ProtocolInfo(protocolName = "testProto", protocolVersion = 1) + public interface OldRpcService extends + OldProtobufRpcProto.BlockingInterface { + } + + @ProtocolInfo(protocolName = "testProto", protocolVersion = 2) + public interface NewRpcService extends + NewProtobufRpcProto.BlockingInterface { + } + + @ProtocolInfo(protocolName = "testProto", protocolVersion = 2) + public interface NewerRpcService extends + NewerProtobufRpcProto.BlockingInterface { + } + + public static class OldServerImpl implements OldRpcService { + + @Override + public EmptyResponseProto ping(RpcController unused, + EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public EmptyResponseProto echo(RpcController unused, + EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); + return EmptyResponseProto.newBuilder().build(); + } + } + + public static class NewServerImpl implements NewRpcService { + + @Override + public EmptyResponseProto ping(RpcController unused, + EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public OptResponseProto echo(RpcController unused, OptRequestProto request) + throws ServiceException { + return OptResponseProto.newBuilder().setMessage(request.getMessage()) + .build(); + } + } + + @ProtocolInfo(protocolName = "testProto", protocolVersion = 2) + public static class NewerServerImpl implements NewerRpcService { + + @Override + public EmptyResponseProto ping(RpcController unused, + EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public EmptyResponseProto echo(RpcController unused, EmptyRequestProto request) + throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); + return EmptyResponseProto.newBuilder().build(); + } + } + + @Test + public void testProtocolVersionMismatch() throws IOException, ServiceException { + conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, NewRpcService.class, ProtobufRpcEngine.class); + + // Create server side implementation + NewServerImpl serverImpl = new NewServerImpl(); + BlockingService service = NewProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + // Get RPC server for server side implementation + server = new RPC.Builder(conf).setProtocol(NewRpcService.class) + .setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build(); + addr = NetUtils.getConnectAddress(server); + + server.start(); + + RPC.setProtocolEngine(conf, OldRpcService.class, ProtobufRpcEngine.class); + + OldRpcService proxy = RPC.getProxy(OldRpcService.class, 0, addr, conf); + // Verify that exception is thrown if protocolVersion is mismatch between + // client and server. + EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); + try { + proxy.ping(null, emptyRequest); + fail("Expected an exception to occur as version mismatch."); + } catch (Exception e) { + if (! (e.getMessage().contains("version mismatch"))){ + // Exception type is not what we expected, re-throw it. + throw new IOException(e); + } + } + + // Verify that missing of optional field is still compatible in RPC call. + RPC.setProtocolEngine(conf, NewerRpcService.class, ProtobufRpcEngine.class); + NewerRpcService newProxy = RPC.getProxy(NewerRpcService.class, 0, addr, + conf); + newProxy.echo(null, emptyRequest); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4084d9b/hadoop-common-project/hadoop-common/src/test/proto/test.proto ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 9965f24..4ab590e 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -35,6 +35,14 @@ message EchoResponseProto { required string message = 1; } +message OptRequestProto { + optional string message = 1; +} + +message OptResponseProto { + optional string message = 1; +} + message SleepRequestProto{ required int32 milliSeconds = 1; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4084d9b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 4f64088..722af89 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -39,3 +39,18 @@ service TestProtobufRpc2Proto { rpc echo2(EchoRequestProto) returns (EchoResponseProto); rpc sleep(SleepRequestProto) returns (SleepResponseProto); } + +service OldProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EmptyRequestProto) returns (EmptyResponseProto); +} + +service NewProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(OptRequestProto) returns (OptResponseProto); +} + +service NewerProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EmptyRequestProto) returns (EmptyResponseProto); +}