Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D44999C44 for ; Sun, 26 Feb 2012 23:32:37 +0000 (UTC) Received: (qmail 84361 invoked by uid 500); 26 Feb 2012 23:32:37 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 84324 invoked by uid 500); 26 Feb 2012 23:32:37 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 84317 invoked by uid 99); 26 Feb 2012 23:32:37 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Feb 2012 23:32:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Feb 2012 23:32:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7920323889D7; Sun, 26 Feb 2012 23:32:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1293964 [3/3] - in /hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/security/to... Date: Sun, 26 Feb 2012 23:32:12 -0000 To: common-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120226233214.7920323889D7@eris.apache.org> Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; +import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.MultithreadedTestUtil; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Joiner; +import com.google.protobuf.BlockingService; + +/** + * Benchmark for protobuf RPC. + * Run with --help option for usage. + */ +public class RPCCallBenchmark implements Tool, Configurable { + private Configuration conf; + private AtomicLong callCount = new AtomicLong(0); + private static ThreadMXBean threadBean = + ManagementFactory.getThreadMXBean(); + + private static class MyOptions { + private boolean failed = false; + private int serverThreads = 0; + private int serverReaderThreads = 1; + private int clientThreads = 0; + private String host = "0.0.0.0"; + private int port = 12345; + public int secondsToRun = 15; + private int msgSize = 1024; + public Class rpcEngine = + WritableRpcEngine.class; + + private MyOptions(String args[]) { + try { + Options opts = buildOptions(); + CommandLineParser parser = new GnuParser(); + CommandLine line = parser.parse(opts, args, true); + processOptions(line, opts); + validateOptions(); + } catch (ParseException e) { + System.err.println(e.getMessage()); + System.err.println("Try \"--help\" option for details."); + failed = true; + } + } + + private void validateOptions() throws ParseException { + if (serverThreads <= 0 && clientThreads <= 0) { + throw new ParseException("Must specify at least -c or -s"); + } + } + + @SuppressWarnings("static-access") + private Options buildOptions() { + Options opts = new Options(); + opts.addOption( + OptionBuilder.withLongOpt("serverThreads").hasArg(true) + .withArgName("numthreads") + .withDescription("number of server threads (handlers) to run (or 0 to not run server)") + .create("s")); + opts.addOption( + OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true) + .withArgName("threads") + .withDescription("number of server reader threads to run") + .create("r")); + + + opts.addOption( + OptionBuilder.withLongOpt("clientThreads").hasArg(true) + .withArgName("numthreads") + .withDescription("number of client threads to run (or 0 to not run client)") + .create("c")); + + opts.addOption( + OptionBuilder.withLongOpt("messageSize").hasArg(true) + .withArgName("bytes") + .withDescription("size of call parameter in bytes") + .create("m")); + + opts.addOption( + OptionBuilder.withLongOpt("time").hasArg(true) + .withArgName("seconds") + .withDescription("number of seconds to run clients for") + .create("t")); + opts.addOption( + OptionBuilder.withLongOpt("port").hasArg(true) + .withArgName("port") + .withDescription("port to listen or connect on") + .create("p")); + opts.addOption( + OptionBuilder.withLongOpt("host").hasArg(true) + .withArgName("addr") + .withDescription("host to listen or connect on") + .create('h')); + + opts.addOption( + OptionBuilder.withLongOpt("engine").hasArg(true) + .withArgName("writable|protobuf") + .withDescription("engine to use") + .create('e')); + + opts.addOption( + OptionBuilder.withLongOpt("help").hasArg(false) + .withDescription("show this screen") + .create('?')); + + return opts; + } + + private void processOptions(CommandLine line, Options opts) + throws ParseException { + if (line.hasOption("help") || line.hasOption('?')) { + HelpFormatter formatter = new HelpFormatter(); + System.out.println("Protobuf IPC benchmark."); + System.out.println(); + formatter.printHelp(100, + "java ... PBRPCBenchmark [options]", + "\nSupported options:", opts, ""); + return; + } + + if (line.hasOption('s')) { + serverThreads = Integer.parseInt(line.getOptionValue('s')); + } + if (line.hasOption('r')) { + serverReaderThreads = Integer.parseInt(line.getOptionValue('r')); + } + if (line.hasOption('c')) { + clientThreads = Integer.parseInt(line.getOptionValue('c')); + } + if (line.hasOption('t')) { + secondsToRun = Integer.parseInt(line.getOptionValue('t')); + } + if (line.hasOption('m')) { + msgSize = Integer.parseInt(line.getOptionValue('m')); + } + if (line.hasOption('p')) { + port = Integer.parseInt(line.getOptionValue('p')); + } + if (line.hasOption('h')) { + host = line.getOptionValue('h'); + } + if (line.hasOption('e')) { + String eng = line.getOptionValue('e'); + if ("protobuf".equals(eng)) { + rpcEngine = ProtobufRpcEngine.class; + } else if ("writable".equals(eng)) { + rpcEngine = WritableRpcEngine.class; + } else { + throw new ParseException("invalid engine: " + eng); + } + } + + String[] remainingArgs = line.getArgs(); + if (remainingArgs.length != 0) { + throw new ParseException("Extra arguments: " + + Joiner.on(" ").join(remainingArgs)); + } + } + + @Override + public String toString() { + return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads + + "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads=" + + clientThreads + "\nhost=" + host + "\nport=" + port + + "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize; + } + } + + + + private Server startServer(MyOptions opts) throws IOException { + if (opts.serverThreads <= 0) { + return null; + } + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + opts.serverReaderThreads); + + RPC.Server server; + // Get RPC server for server side implementation + if (opts.rpcEngine == ProtobufRpcEngine.class) { + // Create server side implementation + PBServerImpl serverImpl = new PBServerImpl(); + BlockingService service = TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + + server = RPC.getServer(TestRpcService.class, service, + opts.host, opts.port, opts.serverThreads, false, conf, null); + } else if (opts.rpcEngine == WritableRpcEngine.class) { + server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(), + opts.host, opts.port, opts.serverThreads, false, conf, null); + } else { + throw new RuntimeException("Bad engine: " + opts.rpcEngine); + } + server.start(); + return server; + } + + private long getTotalCpuTime(Iterable threads) { + long total = 0; + for (Thread t : threads) { + long tid = t.getId(); + total += threadBean.getThreadCpuTime(tid); + } + return total; + } + + @Override + public int run(String[] args) throws Exception { + MyOptions opts = new MyOptions(args); + if (opts.failed) { + return -1; + } + + // Set RPC engine to the configured RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine); + + Server server = startServer(opts); + try { + + TestContext ctx = setupClientTestContext(opts); + if (ctx != null) { + long totalCalls = 0; + ctx.startThreads(); + long veryStart = System.nanoTime(); + + // Loop printing results every second until the specified + // time has elapsed + for (int i = 0; i < opts.secondsToRun ; i++) { + long st = System.nanoTime(); + ctx.waitFor(1000); + long et = System.nanoTime(); + long ct = callCount.getAndSet(0); + totalCalls += ct; + double callsPerSec = (ct * 1000000000)/(et - st); + System.out.println("Calls per second: " + callsPerSec); + } + + // Print results + + if (totalCalls > 0) { + long veryEnd = System.nanoTime(); + double callsPerSec = + (totalCalls * 1000000000)/(veryEnd - veryStart); + long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads()); + long cpuNanosServer = -1; + if (server != null) { + cpuNanosServer = getTotalCpuTime(server.getHandlers());; + } + System.out.println("====== Results ======"); + System.out.println("Options:\n" + opts); + System.out.println("Total calls per second: " + callsPerSec); + System.out.println("CPU time per call on client: " + + (cpuNanosClient / totalCalls) + " ns"); + if (server != null) { + System.out.println("CPU time per call on server: " + + (cpuNanosServer / totalCalls) + " ns"); + } + } else { + System.out.println("No calls!"); + } + + ctx.stop(); + } else { + while (true) { + Thread.sleep(10000); + } + } + } finally { + if (server != null) { + server.stop(); + } + } + + return 0; + } + + + private TestContext setupClientTestContext(final MyOptions opts) + throws IOException, InterruptedException { + if (opts.clientThreads <= 0) { + return null; + } + + // Set up a separate proxy for each client thread, + // rather than making them share TCP pipes. + int numProxies = opts.clientThreads; + final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies]; + for (int i = 0; i < numProxies; i++) { + proxies[i] = + UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{}) + .doAs(new PrivilegedExceptionAction() { + @Override + public RpcServiceWrapper run() throws Exception { + return createRpcClient(opts); + } + }); + } + + // Create an echo message of the desired length + final StringBuilder msgBuilder = new StringBuilder(opts.msgSize); + for (int c = 0; c < opts.msgSize; c++) { + msgBuilder.append('x'); + } + final String echoMessage = msgBuilder.toString(); + + // Create the clients in a test context + TestContext ctx = new TestContext(); + for (int i = 0; i < opts.clientThreads; i++) { + final RpcServiceWrapper proxy = proxies[i % numProxies]; + + ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + proxy.doEcho(echoMessage); + callCount.incrementAndGet(); + } + }); + } + return ctx; + } + + /** + * Simple interface that can be implemented either by the + * protobuf or writable implementations. + */ + private interface RpcServiceWrapper { + public String doEcho(String msg) throws Exception; + } + + /** + * Create a client proxy for the specified engine. + */ + private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port); + + if (opts.rpcEngine == ProtobufRpcEngine.class) { + final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf); + return new RpcServiceWrapper() { + @Override + public String doEcho(String msg) throws Exception { + EchoRequestProto req = EchoRequestProto.newBuilder() + .setMessage(msg) + .build(); + EchoResponseProto responseProto = proxy.echo(null, req); + return responseProto.getMessage(); + } + }; + } else if (opts.rpcEngine == WritableRpcEngine.class) { + final TestProtocol proxy = (TestProtocol)RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + return new RpcServiceWrapper() { + @Override + public String doEcho(String msg) throws Exception { + return proxy.echo(msg); + } + }; + } else { + throw new RuntimeException("unsupported engine: " + opts.rpcEngine); + } + } + + public static void main(String []args) throws Exception { + int rc = ToolRunner.run(new RPCCallBenchmark(), args); + System.exit(rc); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } +} Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1293964&r1=1293963&r2=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sun Feb 26 23:32:06 2012 @@ -23,6 +23,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.NetUtils; @@ -96,8 +97,8 @@ public class TestIPC { } @Override - public Writable call(Class protocol, Writable param, long receiveTime) - throws IOException { + public Writable call(RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { if (sleep) { // sleep a bit try { Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1293964&r1=1293963&r2=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Sun Feb 26 23:32:06 2012 @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; /** @@ -72,8 +73,8 @@ public class TestIPCServerResponder exte } @Override - public Writable call(Class protocol, Writable param, long receiveTime) - throws IOException { + public Writable call(RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { if (sleep) { try { Thread.sleep(RANDOM.nextInt(20)); // sleep a bit Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; +import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.net.NetUtils; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; +import com.google.protobuf.BlockingService; + +public class TestMultipleProtocolServer { + private static final String ADDRESS = "0.0.0.0"; + private static InetSocketAddress addr; + private static RPC.Server server; + + private static Configuration conf = new Configuration(); + + + @ProtocolInfo(protocolName="Foo") + interface Foo0 extends VersionedProtocol { + public static final long versionID = 0L; + String ping() throws IOException; + + } + + @ProtocolInfo(protocolName="Foo") + interface Foo1 extends VersionedProtocol { + public static final long versionID = 1L; + String ping() throws IOException; + String ping2() throws IOException; + } + + @ProtocolInfo(protocolName="Foo") + interface FooUnimplemented extends VersionedProtocol { + public static final long versionID = 2L; + String ping() throws IOException; + } + + interface Mixin extends VersionedProtocol{ + public static final long versionID = 0L; + void hello() throws IOException; + } + interface Bar extends Mixin, VersionedProtocol { + public static final long versionID = 0L; + int echo(int i) throws IOException; + } + + + + class Foo0Impl implements Foo0 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo0.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo0"; + } + + } + + class Foo1Impl implements Foo1 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo1.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo1"; + } + + @Override + public String ping2() { + return "Foo1"; + + } + + } + + + class BarImpl implements Bar { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Bar.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public int echo(int i) { + return i; + } + + @Override + public void hello() { + + + } + } + @Before + public void setUp() throws Exception { + // create a server with two handlers + server = RPC.getServer(Foo0.class, + new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); + server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); + server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); + + + // Add Protobuf server + // Create server side implementation + PBServerImpl pbServerImpl = + new PBServerImpl(); + BlockingService service = TestProtobufRpcProto + .newReflectiveBlockingService(pbServerImpl); + server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, + service); + server.start(); + addr = NetUtils.getConnectAddress(server); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void test1() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); + + Foo0 foo0 = (Foo0)proxy.getProxy(); + Assert.assertEquals("Foo0", foo0.ping()); + + + proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); + + + Foo1 foo1 = (Foo1)proxy.getProxy(); + Assert.assertEquals("Foo1", foo1.ping()); + Assert.assertEquals("Foo1", foo1.ping()); + + + proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); + + + Bar bar = (Bar)proxy.getProxy(); + Assert.assertEquals(99, bar.echo(99)); + + // Now test Mixin class method + + Mixin mixin = bar; + mixin.hello(); + } + + + // Server does not implement the FooUnimplemented version of protocol Foo. + // See that calls to it fail. + @Test(expected=IOException.class) + public void testNonExistingProtocol() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + foo.ping(); + } + + + /** + * getProtocolVersion of an unimplemented version should return highest version + * Similarly getProtocolSignature should work. + * @throws IOException + */ + @Test + public void testNonExistingProtocol2() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + Assert.assertEquals(Foo1.versionID, + foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID)); + foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID, 0); + } + + @Test(expected=IOException.class) + public void testIncorrectServerCreation() throws IOException { + RPC.getServer(Foo1.class, + new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); + } + + // Now test a PB service - a server hosts both PB and Writable Rpcs. + @Test + public void testPBService() throws Exception { + // Set RPC engine to protobuf RPC engine + Configuration conf2 = new Configuration(); + RPC.setProtocolEngine(conf2, TestRpcService.class, + ProtobufRpcEngine.class); + TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2); + TestProtoBufRpc.testProtoBufRpc(client); + } +} Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; +import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.net.NetUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Test for testing protocol buffer based RPC mechanism. + * This test depends on test.proto definition of types in src/test/proto + * and protobuf service definition from src/test/test_rpc_service.proto + */ +public class TestProtoBufRpc { + public final static String ADDRESS = "0.0.0.0"; + public final static int PORT = 0; + private static InetSocketAddress addr; + private static Configuration conf; + private static RPC.Server server; + + @ProtocolInfo(protocolName = "testProto", protocolVersion = 1) + public interface TestRpcService + extends TestProtobufRpcProto.BlockingInterface { + } + + @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1) + public interface TestRpcService2 extends + TestProtobufRpc2Proto.BlockingInterface { + } + + public static class PBServerImpl implements TestRpcService { + + @Override + public EmptyResponseProto ping(RpcController unused, + EmptyRequestProto request) throws ServiceException { + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public EchoResponseProto echo(RpcController unused, EchoRequestProto request) + throws ServiceException { + return EchoResponseProto.newBuilder().setMessage(request.getMessage()) + .build(); + } + + @Override + public EmptyResponseProto error(RpcController unused, + EmptyRequestProto request) throws ServiceException { + throw new ServiceException("error", new RpcServerException("error")); + } + } + + public static class PBServer2Impl implements TestRpcService2 { + + @Override + public EmptyResponseProto ping2(RpcController unused, + EmptyRequestProto request) throws ServiceException { + return EmptyResponseProto.newBuilder().build(); + } + + @Override + public EchoResponseProto echo2(RpcController unused, EchoRequestProto request) + throws ServiceException { + return EchoResponseProto.newBuilder().setMessage(request.getMessage()) + .build(); + } + } + + @Before + public void setUp() throws IOException { // Setup server for both protocols + conf = new Configuration(); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); + + // Create server side implementation + PBServerImpl serverImpl = new PBServerImpl(); + BlockingService service = TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + + // Get RPC server for server side implementation + server = RPC.getServer(TestRpcService.class, service, ADDRESS, PORT, conf); + addr = NetUtils.getConnectAddress(server); + + // now the second protocol + PBServer2Impl server2Impl = new PBServer2Impl(); + BlockingService service2 = TestProtobufRpc2Proto + .newReflectiveBlockingService(server2Impl); + + server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, + service2); + server.start(); + } + + + @After + public void tearDown() throws Exception { + server.stop(); + } + + private static TestRpcService getClient() throws IOException { + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + return RPC.getProxy(TestRpcService.class, 0, addr, + conf); + } + + private static TestRpcService2 getClient2() throws IOException { + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService2.class, + ProtobufRpcEngine.class); + return RPC.getProxy(TestRpcService2.class, 0, addr, + conf); + } + + @Test + public void testProtoBufRpc() throws Exception { + TestRpcService client = getClient(); + testProtoBufRpc(client); + } + + // separated test out so that other tests can call it. + public static void testProtoBufRpc(TestRpcService client) throws Exception { + // Test ping method + EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); + client.ping(null, emptyRequest); + + // Test echo method + EchoRequestProto echoRequest = EchoRequestProto.newBuilder() + .setMessage("hello").build(); + EchoResponseProto echoResponse = client.echo(null, echoRequest); + Assert.assertEquals(echoResponse.getMessage(), "hello"); + + // Test error method - error should be thrown as RemoteException + try { + client.error(null, emptyRequest); + Assert.fail("Expected exception is not thrown"); + } catch (ServiceException e) { + RemoteException re = (RemoteException)e.getCause(); + RpcServerException rse = (RpcServerException) re + .unwrapRemoteException(RpcServerException.class); + } + } + + @Test + public void testProtoBufRpc2() throws Exception { + TestRpcService2 client = getClient2(); + + // Test ping method + EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); + client.ping2(null, emptyRequest); + + // Test echo method + EchoRequestProto echoRequest = EchoRequestProto.newBuilder() + .setMessage("hello").build(); + EchoResponseProto echoResponse = client.echo2(null, echoRequest); + Assert.assertEquals(echoResponse.getMessage(), "hello"); + + // Ensure RPC metrics are updated + MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics); + assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics); + + MetricsRecordBuilder rpcDetailedMetrics = + getMetrics(server.getRpcDetailedMetrics().name()); + assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics); + } +} \ No newline at end of file Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1293964&r1=1293963&r2=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Sun Feb 26 23:32:06 2012 @@ -18,28 +18,39 @@ package org.apache.hadoop.ipc; +import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Arrays; -import junit.framework.TestCase; +import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; +import static org.junit.Assert.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; @@ -49,18 +60,22 @@ import static org.apache.hadoop.test.Met import static org.mockito.Mockito.*; /** Unit tests for RPC. */ -public class TestRPC extends TestCase { +@SuppressWarnings("deprecation") +public class TestRPC { private static final String ADDRESS = "0.0.0.0"; public static final Log LOG = LogFactory.getLog(TestRPC.class); private static Configuration conf = new Configuration(); + + static { + conf.setClass("rpc.engine." + StoppedProtocol.class.getName(), + StoppedRpcEngine.class, RpcEngine.class); + } int datasize = 1024*100; int numThreads = 50; - - public TestRPC(String name) { super(name); } public interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; @@ -207,6 +222,80 @@ public class TestRPC extends TestCase { } } + /** + * A basic interface for testing client-side RPC resource cleanup. + */ + private static interface StoppedProtocol { + long versionID = 0; + + public void stop(); + } + + /** + * A class used for testing cleanup of client side RPC resources. + */ + private static class StoppedRpcEngine implements RpcEngine { + + @Override + public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, + UserGroupInformation ticket, Configuration conf) + throws IOException, InterruptedException { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new StoppedInvocationHandler()); + return new ProtocolProxy(protocol, proxy, false); + } + + @Override + public org.apache.hadoop.ipc.RPC.Server getServer(Class protocol, + Object instance, String bindAddress, int port, int numHandlers, + int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, + SecretManager secretManager) throws IOException { + return null; + } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } + } + + /** + * An invocation handler which does nothing when invoking methods, and just + * counts the number of times close() is called. + */ + private static class StoppedInvocationHandler + implements InvocationHandler, Closeable { + + private int closeCalled = 0; + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return null; + } + + @Override + public void close() throws IOException { + closeCalled++; + } + + public int getCloseCalled() { + return closeCalled; + } + + } + + @Test public void testConfRpc() throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 1, false, conf, null); @@ -229,6 +318,7 @@ public class TestRPC extends TestCase { server.stop(); } + @Test public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); // create a server with two handlers @@ -273,11 +363,12 @@ public class TestRPC extends TestCase { } } - public void testRPCConf(Configuration conf) throws Exception { - + @Test + public void testCalls() throws Exception { + testCallsInternal(conf); } - - public void testCalls(Configuration conf) throws Exception { + + private void testCallsInternal(Configuration conf) throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, conf); TestProtocol proxy = null; @@ -384,6 +475,7 @@ public class TestRPC extends TestCase { } } + @Test public void testStandaloneClient() throws IOException { try { TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, @@ -450,6 +542,7 @@ public class TestRPC extends TestCase { } } + @Test public void testAuthorization() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, @@ -481,20 +574,48 @@ public class TestRPC extends TestCase { Configuration conf = new Configuration(); conf.setBoolean("ipc.client.ping", false); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); } /** * Test stopping a non-registered proxy * @throws Exception */ + @Test public void testStopNonRegisteredProxy() throws Exception { RPC.stopProxy(mock(TestProtocol.class)); } + @Test + public void testStopProxy() throws IOException { + StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(proxy); + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test + public void testWrappedStopProxy() throws IOException { + StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(wrappedProxy); + + StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class, + wrappedProxy, RetryPolicies.RETRY_FOREVER); + + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test public void testErrorMsgForInsecureClient() throws Exception { final Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null); @@ -567,10 +688,10 @@ public class TestRPC extends TestCase { return count; } - /** * Test that server.stop() properly stops all threads */ + @Test public void testStopsAllThreads() throws Exception { int threadsBefore = countThreads("Server$Listener$Reader"); assertEquals("Expect no Reader threads running before test", @@ -591,8 +712,7 @@ public class TestRPC extends TestCase { } public static void main(String[] args) throws Exception { - - new TestRPC("test").testCalls(conf); + new TestRPC().testCallsInternal(conf); } } Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import static org.junit.Assert.*; + +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + + +public class TestRPCCallBenchmark { + + @Test(timeout=20000) + public void testBenchmarkWithWritable() throws Exception { + int rc = ToolRunner.run(new RPCCallBenchmark(), + new String[] { + "--clientThreads", "30", + "--serverThreads", "30", + "--time", "5", + "--serverReaderThreads", "4", + "--messageSize", "1024", + "--engine", "writable"}); + assertEquals(0, rc); + } + + @Test(timeout=20000) + public void testBenchmarkWithProto() throws Exception { + int rc = ToolRunner.run(new RPCCallBenchmark(), + new String[] { + "--clientThreads", "30", + "--serverThreads", "30", + "--time", "5", + "--serverReaderThreads", "4", + "--messageSize", "1024", + "--engine", "protobuf"}); + assertEquals(0, rc); + } +} Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1293964&r1=1293963&r2=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Sun Feb 26 23:32:06 2012 @@ -31,6 +31,10 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.net.NetUtils; import org.junit.After; import org.junit.Test; @@ -39,7 +43,7 @@ import org.junit.Test; public class TestRPCCompatibility { private static final String ADDRESS = "0.0.0.0"; private static InetSocketAddress addr; - private static Server server; + private static RPC.Server server; private ProtocolProxy proxy; public static final Log LOG = @@ -52,10 +56,14 @@ public class TestRPCCompatibility { void ping() throws IOException; } - public interface TestProtocol1 extends TestProtocol0 { + public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 { String echo(String value) throws IOException; } + + // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol2 extends TestProtocol1 { int echo(int value) throws IOException; } @@ -89,28 +97,44 @@ public class TestRPCCompatibility { public static class TestImpl1 extends TestImpl0 implements TestProtocol1 { @Override public String echo(String value) { return value; } + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol1.versionID; + } } public static class TestImpl2 extends TestImpl1 implements TestProtocol2 { @Override public int echo(int value) { return value; } + + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol2.versionID; + } + } @After public void tearDown() throws IOException { if (proxy != null) { RPC.stopProxy(proxy.getProxy()); + proxy = null; } if (server != null) { server.stop(); + server = null; } } @Test // old client vs new server public void testVersion0ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -154,8 +178,10 @@ public class TestRPCCompatibility { public int echo(int value) throws IOException, NumberFormatException { if (serverInfo.isMethodSupported("echo", int.class)) { +System.out.println("echo int is supported"); return -value; // use version 3 echo long } else { // server is version 2 +System.out.println("echo int is NOT supported"); return Integer.parseInt(proxy2.echo(String.valueOf(value))); } } @@ -172,8 +198,10 @@ public class TestRPCCompatibility { @Test // Compatible new client & old server public void testVersion2ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -189,9 +217,12 @@ public class TestRPCCompatibility { @Test // equal version client and server public void testVersion2ClientVersion2Server() throws Exception { + ProtocolSignature.resetCache(); // create a server with two handlers + TestImpl2 impl = new TestImpl2(); server = RPC.getServer(TestProtocol2.class, - new TestImpl2(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -250,14 +281,16 @@ public class TestRPCCompatibility { assertEquals(hash1, hash2); } + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol4 extends TestProtocol2 { - public static final long versionID = 1L; + public static final long versionID = 4L; int echo(int value) throws IOException; } @Test public void testVersionMismatch() throws IOException { - server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2, + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, false, conf, null); server.start(); addr = NetUtils.getConnectAddress(server); @@ -268,7 +301,76 @@ public class TestRPCCompatibility { proxy.echo(21); fail("The call must throw VersionMismatch exception"); } catch (IOException ex) { - Assert.assertTrue(ex.getMessage().contains("VersionMismatch")); + Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), + ex.getMessage().contains("VersionMismatch")); + } + } + + @Test + public void testIsMethodSupported() throws IOException { + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, + false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, + conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class protocol, RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); } } \ No newline at end of file Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1293964&r1=1293963&r2=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Sun Feb 26 23:32:06 2012 @@ -164,6 +164,10 @@ public abstract class MultithreadedTestU } checkException(); } + + public Iterable getTestThreads() { + return testThreads; + } } /** Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java) URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java&r1=1293950&r2=1293964&rev=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto Sun Feb 26 23:32:06 2012 @@ -16,24 +16,20 @@ * limitations under the License. */ -package org.apache.hadoop.hdfs.security.token.block; +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "TestProtos"; +option java_generate_equals_and_hash = true; -import javax.crypto.SecretKey; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.security.token.delegation.DelegationKey; +message EmptyRequestProto { +} -/** - * Key used for generating and verifying block tokens - */ -@InterfaceAudience.Private -public class BlockKey extends DelegationKey { +message EmptyResponseProto { +} - public BlockKey() { - super(); - } +message EchoRequestProto { + required string message = 1; +} - public BlockKey(int keyId, long expiryDate, SecretKey key) { - super(keyId, expiryDate, key); - } +message EchoResponseProto { + required string message = 1; } Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java) URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java&r1=1293950&r2=1293964&rev=1293964&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto Sun Feb 26 23:32:06 2012 @@ -15,25 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "TestRpcServiceProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; -package org.apache.hadoop.hdfs.security.token.block; +import "test.proto"; -import javax.crypto.SecretKey; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.security.token.delegation.DelegationKey; /** - * Key used for generating and verifying block tokens + * A protobuf service for use in tests */ -@InterfaceAudience.Private -public class BlockKey extends DelegationKey { - - public BlockKey() { - super(); - } +service TestProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EchoRequestProto) returns (EchoResponseProto); + rpc error(EmptyRequestProto) returns (EmptyResponseProto); +} - public BlockKey(int keyId, long expiryDate, SecretKey key) { - super(keyId, expiryDate, key); - } +service TestProtobufRpc2Proto { + rpc ping2(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo2(EchoRequestProto) returns (EchoResponseProto); }