hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1293964 [3/3] - in /hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/security/to...
Date Sun, 26 Feb 2012 23:32:12 GMT
Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
+import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
+
+/**
+ * Benchmark for protobuf RPC.
+ * Run with --help option for usage.
+ */
+public class RPCCallBenchmark implements Tool, Configurable {
+  private Configuration conf;
+  private AtomicLong callCount = new AtomicLong(0);
+  private static ThreadMXBean threadBean =
+    ManagementFactory.getThreadMXBean();
+  
+  private static class MyOptions {
+    private boolean failed = false;
+    private int serverThreads = 0;
+    private int serverReaderThreads = 1;
+    private int clientThreads = 0;
+    private String host = "0.0.0.0";
+    private int port = 12345;
+    public int secondsToRun = 15;
+    private int msgSize = 1024;
+    public Class<? extends RpcEngine> rpcEngine =
+      WritableRpcEngine.class;
+    
+    private MyOptions(String args[]) {
+      try {
+        Options opts = buildOptions();
+        CommandLineParser parser = new GnuParser();
+        CommandLine line = parser.parse(opts, args, true);
+        processOptions(line, opts);
+        validateOptions();
+      } catch (ParseException e) {
+        System.err.println(e.getMessage());
+        System.err.println("Try \"--help\" option for details.");
+        failed = true;
+      }
+    }
+
+    private void validateOptions() throws ParseException {
+      if (serverThreads <= 0 && clientThreads <= 0) {
+        throw new ParseException("Must specify at least -c or -s");
+      }
+    }
+
+    @SuppressWarnings("static-access")
+    private Options buildOptions() {
+      Options opts = new Options();
+      opts.addOption(
+        OptionBuilder.withLongOpt("serverThreads").hasArg(true)
+        .withArgName("numthreads")
+        .withDescription("number of server threads (handlers) to run (or 0 to not run server)")
+        .create("s"));
+      opts.addOption(
+        OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
+        .withArgName("threads")
+        .withDescription("number of server reader threads to run")
+        .create("r"));
+
+      
+      opts.addOption(
+        OptionBuilder.withLongOpt("clientThreads").hasArg(true)
+        .withArgName("numthreads")
+        .withDescription("number of client threads to run (or 0 to not run client)")
+        .create("c"));
+
+      opts.addOption(
+        OptionBuilder.withLongOpt("messageSize").hasArg(true)
+        .withArgName("bytes")
+        .withDescription("size of call parameter in bytes")
+        .create("m"));
+
+      opts.addOption(
+          OptionBuilder.withLongOpt("time").hasArg(true)
+          .withArgName("seconds")
+          .withDescription("number of seconds to run clients for")
+          .create("t"));
+      opts.addOption(
+          OptionBuilder.withLongOpt("port").hasArg(true)
+          .withArgName("port")
+          .withDescription("port to listen or connect on")
+          .create("p"));
+      opts.addOption(
+          OptionBuilder.withLongOpt("host").hasArg(true)
+          .withArgName("addr")
+          .withDescription("host to listen or connect on")
+          .create('h'));
+      
+      opts.addOption(
+          OptionBuilder.withLongOpt("engine").hasArg(true)
+          .withArgName("writable|protobuf")
+          .withDescription("engine to use")
+          .create('e'));
+      
+      opts.addOption(
+          OptionBuilder.withLongOpt("help").hasArg(false)
+          .withDescription("show this screen")
+          .create('?'));
+
+      return opts;
+    }
+    
+    private void processOptions(CommandLine line, Options opts)
+      throws ParseException {
+      if (line.hasOption("help") || line.hasOption('?')) {
+        HelpFormatter formatter = new HelpFormatter();
+        System.out.println("Protobuf IPC benchmark.");
+        System.out.println();
+        formatter.printHelp(100,
+            "java ... PBRPCBenchmark [options]",
+            "\nSupported options:", opts, "");
+        return;
+      }
+
+      if (line.hasOption('s')) {
+        serverThreads = Integer.parseInt(line.getOptionValue('s'));
+      }
+      if (line.hasOption('r')) {
+        serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
+      }
+      if (line.hasOption('c')) {
+        clientThreads = Integer.parseInt(line.getOptionValue('c'));
+      }
+      if (line.hasOption('t')) {
+        secondsToRun = Integer.parseInt(line.getOptionValue('t'));
+      }
+      if (line.hasOption('m')) {
+        msgSize = Integer.parseInt(line.getOptionValue('m'));
+      }
+      if (line.hasOption('p')) {
+        port = Integer.parseInt(line.getOptionValue('p'));
+      }
+      if (line.hasOption('h')) {
+        host = line.getOptionValue('h');
+      }
+      if (line.hasOption('e')) {
+        String eng = line.getOptionValue('e');
+        if ("protobuf".equals(eng)) {
+          rpcEngine = ProtobufRpcEngine.class;
+        } else if ("writable".equals(eng)) {
+          rpcEngine = WritableRpcEngine.class;
+        } else {
+          throw new ParseException("invalid engine: " + eng);
+        }
+      }
+      
+      String[] remainingArgs = line.getArgs();
+      if (remainingArgs.length != 0) {
+        throw new ParseException("Extra arguments: " +
+            Joiner.on(" ").join(remainingArgs));
+      }
+    }
+    
+    @Override
+    public String toString() {
+      return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
+          + "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
+          + clientThreads + "\nhost=" + host + "\nport=" + port
+          + "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
+    }
+  }
+
+
+  
+  private Server startServer(MyOptions opts) throws IOException {
+    if (opts.serverThreads <= 0) {
+      return null;
+    }
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+        opts.serverReaderThreads);
+    
+    RPC.Server server;
+    // Get RPC server for server side implementation
+    if (opts.rpcEngine == ProtobufRpcEngine.class) {
+      // Create server side implementation
+      PBServerImpl serverImpl = new PBServerImpl();
+      BlockingService service = TestProtobufRpcProto
+          .newReflectiveBlockingService(serverImpl);
+
+      server = RPC.getServer(TestRpcService.class, service,
+          opts.host, opts.port, opts.serverThreads, false, conf, null);
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
+          opts.host, opts.port, opts.serverThreads, false, conf, null);
+    } else {
+      throw new RuntimeException("Bad engine: " + opts.rpcEngine);
+    }
+    server.start();
+    return server;
+  }
+  
+  private long getTotalCpuTime(Iterable<? extends Thread> threads) {
+    long total = 0;
+    for (Thread t : threads) {
+      long tid = t.getId();
+      total += threadBean.getThreadCpuTime(tid);
+    }
+    return total;
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    MyOptions opts = new MyOptions(args);
+    if (opts.failed) {
+      return -1;
+    }
+    
+    // Set RPC engine to the configured RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
+
+    Server server = startServer(opts);
+    try {
+      
+      TestContext ctx = setupClientTestContext(opts);
+      if (ctx != null) {
+        long totalCalls = 0;
+        ctx.startThreads();
+        long veryStart = System.nanoTime();
+
+        // Loop printing results every second until the specified
+        // time has elapsed
+        for (int i = 0; i < opts.secondsToRun ; i++) {
+          long st = System.nanoTime();
+          ctx.waitFor(1000);
+          long et = System.nanoTime();
+          long ct = callCount.getAndSet(0);
+          totalCalls += ct;
+          double callsPerSec = (ct * 1000000000)/(et - st);
+          System.out.println("Calls per second: " + callsPerSec);
+        }
+        
+        // Print results
+
+        if (totalCalls > 0) {
+          long veryEnd = System.nanoTime();
+          double callsPerSec =
+            (totalCalls * 1000000000)/(veryEnd - veryStart);
+          long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
+          long cpuNanosServer = -1;
+          if (server != null) {
+            cpuNanosServer = getTotalCpuTime(server.getHandlers());; 
+          }
+          System.out.println("====== Results ======");
+          System.out.println("Options:\n" + opts);
+          System.out.println("Total calls per second: " + callsPerSec);
+          System.out.println("CPU time per call on client: " +
+              (cpuNanosClient / totalCalls) + " ns");
+          if (server != null) {
+            System.out.println("CPU time per call on server: " +
+                (cpuNanosServer / totalCalls) + " ns");
+          }
+        } else {
+          System.out.println("No calls!");
+        }
+
+        ctx.stop();
+      } else {
+        while (true) {
+          Thread.sleep(10000);
+        }
+      }
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }
+    
+    return 0;
+  }
+
+
+  private TestContext setupClientTestContext(final MyOptions opts)
+      throws IOException, InterruptedException {
+    if (opts.clientThreads <= 0) {
+      return null;
+    }
+
+    // Set up a separate proxy for each client thread,
+    // rather than making them share TCP pipes.
+    int numProxies = opts.clientThreads;
+    final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
+    for (int i = 0; i < numProxies; i++) {
+      proxies[i] =
+        UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
+        .doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
+          @Override
+          public RpcServiceWrapper run() throws Exception {
+            return createRpcClient(opts);
+          }
+        });
+    }
+
+    // Create an echo message of the desired length
+    final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
+    for (int c = 0; c < opts.msgSize; c++) {
+      msgBuilder.append('x');
+    }
+    final String echoMessage = msgBuilder.toString();
+
+    // Create the clients in a test context
+    TestContext ctx = new TestContext();
+    for (int i = 0; i < opts.clientThreads; i++) {
+      final RpcServiceWrapper proxy = proxies[i % numProxies];
+      
+      ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+        @Override
+        public void doAnAction() throws Exception {
+          proxy.doEcho(echoMessage);
+          callCount.incrementAndGet();
+        }
+      });
+    }
+    return ctx;
+  }
+
+  /**
+   * Simple interface that can be implemented either by the
+   * protobuf or writable implementations.
+   */
+  private interface RpcServiceWrapper {
+    public String doEcho(String msg) throws Exception;
+  }
+
+  /**
+   * Create a client proxy for the specified engine.
+   */
+  private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
+    
+    if (opts.rpcEngine == ProtobufRpcEngine.class) {
+      final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
+      return new RpcServiceWrapper() {
+        @Override
+        public String doEcho(String msg) throws Exception {
+          EchoRequestProto req = EchoRequestProto.newBuilder()
+            .setMessage(msg)
+            .build();
+          EchoResponseProto responseProto = proxy.echo(null, req);
+          return responseProto.getMessage();
+        }
+      };
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      final TestProtocol proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      return new RpcServiceWrapper() {
+        @Override
+        public String doEcho(String msg) throws Exception {
+          return proxy.echo(msg);
+        }
+      };
+    } else {
+      throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
+    }
+  }
+
+  public static void main(String []args) throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(), args);
+    System.exit(rc);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sun Feb 26 23:32:06 2012
@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
@@ -96,8 +97,8 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
         // sleep a bit
         try {

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Sun Feb 26 23:32:06 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 
 /**
@@ -72,8 +73,8 @@ public class TestIPCServerResponder exte
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
         try {
           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
+import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import com.google.protobuf.BlockingService;
+
+public class TestMultipleProtocolServer {
+  private static final String ADDRESS = "0.0.0.0";
+  private static InetSocketAddress addr;
+  private static RPC.Server server;
+
+  private static Configuration conf = new Configuration();
+  
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo0 extends VersionedProtocol {
+    public static final long versionID = 0L;
+    String ping() throws IOException;
+    
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo1 extends VersionedProtocol {
+    public static final long versionID = 1L;
+    String ping() throws IOException;
+    String ping2() throws IOException;
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface FooUnimplemented extends VersionedProtocol {
+    public static final long versionID = 2L;
+    String ping() throws IOException;  
+  }
+  
+  interface Mixin extends VersionedProtocol{
+    public static final long versionID = 0L;
+    void hello() throws IOException;
+  }
+  interface Bar extends Mixin, VersionedProtocol {
+    public static final long versionID = 0L;
+    int echo(int i) throws IOException;
+  }
+  
+  
+  
+  class Foo0Impl implements Foo0 {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo0.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public String ping() {
+      return "Foo0";     
+    }
+    
+  }
+  
+  class Foo1Impl implements Foo1 {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo1.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                        getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public String ping() {
+      return "Foo1";
+    }
+
+    @Override
+    public String ping2() {
+      return "Foo1";
+      
+    }
+    
+  }
+
+  
+  class BarImpl implements Bar {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Bar.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public int echo(int i) {
+      return i;
+    }
+
+    @Override
+    public void hello() {
+
+      
+    }
+  }
+  @Before
+  public void setUp() throws Exception {
+    // create a server with two handlers
+    server = RPC.getServer(Foo0.class,
+                              new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+    
+    
+    // Add Protobuf server
+    // Create server side implementation
+    PBServerImpl pbServerImpl = 
+        new PBServerImpl();
+    BlockingService service = TestProtobufRpcProto
+        .newReflectiveBlockingService(pbServerImpl);
+    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+        service);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  @Test
+  public void test1() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
+
+    Foo0 foo0 = (Foo0)proxy.getProxy(); 
+    Assert.assertEquals("Foo0", foo0.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+    
+    
+    Foo1 foo1 = (Foo1)proxy.getProxy(); 
+    Assert.assertEquals("Foo1", foo1.ping());
+    Assert.assertEquals("Foo1", foo1.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+    
+    
+    Bar bar = (Bar)proxy.getProxy(); 
+    Assert.assertEquals(99, bar.echo(99));
+    
+    // Now test Mixin class method
+    
+    Mixin mixin = bar;
+    mixin.hello();
+  }
+  
+  
+  // Server does not implement the FooUnimplemented version of protocol Foo.
+  // See that calls to it fail.
+  @Test(expected=IOException.class)
+  public void testNonExistingProtocol() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    foo.ping();
+  }
+  
+  
+  /**
+   * getProtocolVersion of an unimplemented version should return highest version
+   * Similarly getProtocolSignature should work.
+   * @throws IOException
+   */
+  @Test
+  public void testNonExistingProtocol2() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    Assert.assertEquals(Foo1.versionID, 
+        foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID));
+    foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID, 0);
+  }
+  
+  @Test(expected=IOException.class)
+  public void testIncorrectServerCreation() throws IOException {
+    RPC.getServer(Foo1.class,
+        new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+  } 
+  
+  // Now test a PB service - a server  hosts both PB and Writable Rpcs.
+  @Test
+  public void testPBService() throws Exception {
+    // Set RPC engine to protobuf RPC engine
+    Configuration conf2 = new Configuration();
+    RPC.setProtocolEngine(conf2, TestRpcService.class,
+        ProtobufRpcEngine.class);
+    TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2);
+    TestProtoBufRpc.testProtoBufRpc(client);
+  }
+}

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Test for testing protocol buffer based RPC mechanism.
+ * This test depends on test.proto definition of types in src/test/proto
+ * and protobuf service definition from src/test/test_rpc_service.proto
+ */
+public class TestProtoBufRpc {
+  public final static String ADDRESS = "0.0.0.0";
+  public final static int PORT = 0;
+  private static InetSocketAddress addr;
+  private static Configuration conf;
+  private static RPC.Server server;
+  
+  @ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
+  public interface TestRpcService
+      extends TestProtobufRpcProto.BlockingInterface {
+  }
+
+  @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
+  public interface TestRpcService2 extends
+      TestProtobufRpc2Proto.BlockingInterface {
+  }
+
+  public static class PBServerImpl implements TestRpcService {
+
+    @Override
+    public EmptyResponseProto ping(RpcController unused,
+        EmptyRequestProto request) throws ServiceException {
+      return EmptyResponseProto.newBuilder().build();
+    }
+
+    @Override
+    public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
+        throws ServiceException {
+      return EchoResponseProto.newBuilder().setMessage(request.getMessage())
+          .build();
+    }
+
+    @Override
+    public EmptyResponseProto error(RpcController unused,
+        EmptyRequestProto request) throws ServiceException {
+      throw new ServiceException("error", new RpcServerException("error"));
+    }
+  }
+  
+  public static class PBServer2Impl implements TestRpcService2 {
+
+    @Override
+    public EmptyResponseProto ping2(RpcController unused,
+        EmptyRequestProto request) throws ServiceException {
+      return EmptyResponseProto.newBuilder().build();
+    }
+
+    @Override
+    public EchoResponseProto echo2(RpcController unused, EchoRequestProto request)
+        throws ServiceException {
+      return EchoResponseProto.newBuilder().setMessage(request.getMessage())
+          .build();
+    }
+  }
+
+  @Before
+  public  void setUp() throws IOException { // Setup server for both protocols
+    conf = new Configuration();
+    // Set RPC engine to protobuf RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
+
+    // Create server side implementation
+    PBServerImpl serverImpl = new PBServerImpl();
+    BlockingService service = TestProtobufRpcProto
+        .newReflectiveBlockingService(serverImpl);
+
+    // Get RPC server for server side implementation
+    server = RPC.getServer(TestRpcService.class, service, ADDRESS, PORT, conf);
+    addr = NetUtils.getConnectAddress(server);
+    
+    // now the second protocol
+    PBServer2Impl server2Impl = new PBServer2Impl();
+    BlockingService service2 = TestProtobufRpc2Proto
+        .newReflectiveBlockingService(server2Impl);
+    
+    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
+        service2);
+    server.start();
+  }
+  
+  
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  private static TestRpcService getClient() throws IOException {
+    // Set RPC engine to protobuf RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService.class,
+        ProtobufRpcEngine.class);
+        return RPC.getProxy(TestRpcService.class, 0, addr,
+        conf);
+  }
+  
+  private static TestRpcService2 getClient2() throws IOException {
+    // Set RPC engine to protobuf RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService2.class,
+        ProtobufRpcEngine.class);
+        return RPC.getProxy(TestRpcService2.class, 0, addr,
+        conf);
+  }
+
+  @Test
+  public void testProtoBufRpc() throws Exception {
+    TestRpcService client = getClient();
+    testProtoBufRpc(client);
+  }
+  
+  // separated test out so that other tests can call it.
+  public static void testProtoBufRpc(TestRpcService client) throws Exception {  
+    // Test ping method
+    EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+    client.ping(null, emptyRequest);
+    
+    // Test echo method
+    EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+        .setMessage("hello").build();
+    EchoResponseProto echoResponse = client.echo(null, echoRequest);
+    Assert.assertEquals(echoResponse.getMessage(), "hello");
+    
+    // Test error method - error should be thrown as RemoteException
+    try {
+      client.error(null, emptyRequest);
+      Assert.fail("Expected exception is not thrown");
+    } catch (ServiceException e) {
+      RemoteException re = (RemoteException)e.getCause();
+      RpcServerException rse = (RpcServerException) re
+          .unwrapRemoteException(RpcServerException.class);
+    }
+  }
+  
+  @Test
+  public void testProtoBufRpc2() throws Exception {
+    TestRpcService2 client = getClient2();
+    
+    // Test ping method
+    EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+    client.ping2(null, emptyRequest);
+    
+    // Test echo method
+    EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+        .setMessage("hello").build();
+    EchoResponseProto echoResponse = client.echo2(null, echoRequest);
+    Assert.assertEquals(echoResponse.getMessage(), "hello");
+    
+    // Ensure RPC metrics are updated
+    MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name());
+    assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
+    assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
+    
+    MetricsRecordBuilder rpcDetailedMetrics = 
+        getMetrics(server.getRpcDetailedMetrics().name());
+    assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Sun Feb 26 23:32:06 2012
@@ -18,28 +18,39 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Arrays;
 
-import junit.framework.TestCase;
+import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
@@ -49,18 +60,22 @@ import static org.apache.hadoop.test.Met
 import static org.mockito.Mockito.*;
 
 /** Unit tests for RPC. */
-public class TestRPC extends TestCase {
+@SuppressWarnings("deprecation")
+public class TestRPC {
   private static final String ADDRESS = "0.0.0.0";
 
   public static final Log LOG =
     LogFactory.getLog(TestRPC.class);
   
   private static Configuration conf = new Configuration();
+  
+  static {
+    conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
+        StoppedRpcEngine.class, RpcEngine.class);
+  }
 
   int datasize = 1024*100;
   int numThreads = 50;
-
-  public TestRPC(String name) { super(name); }
 	
   public interface TestProtocol extends VersionedProtocol {
     public static final long versionID = 1L;
@@ -207,6 +222,80 @@ public class TestRPC extends TestCase {
     }
   }
   
+  /**
+   * A basic interface for testing client-side RPC resource cleanup.
+   */
+  private static interface StoppedProtocol {
+    long versionID = 0;
+
+    public void stop();
+  }
+  
+  /**
+   * A class used for testing cleanup of client side RPC resources.
+   */
+  private static class StoppedRpcEngine implements RpcEngine {
+
+    @Override
+    public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+        UserGroupInformation ticket, Configuration conf)
+        throws IOException, InterruptedException {
+      return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout) throws IOException {
+      T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+              new Class[] { protocol }, new StoppedInvocationHandler());
+      return new ProtocolProxy<T>(protocol, proxy, false);
+    }
+
+    @Override
+    public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+        Object instance, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+        ConnectionId connId, Configuration conf, SocketFactory factory)
+        throws IOException {
+      throw new UnsupportedOperationException("This proxy is not supported");
+    }
+  }
+
+  /**
+   * An invocation handler which does nothing when invoking methods, and just
+   * counts the number of times close() is called.
+   */
+  private static class StoppedInvocationHandler
+      implements InvocationHandler, Closeable {
+    
+    private int closeCalled = 0;
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+          return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCalled++;
+    }
+    
+    public int getCloseCalled() {
+      return closeCalled;
+    }
+    
+  }
+  
+  @Test
   public void testConfRpc() throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, 1, false, conf, null);
@@ -229,6 +318,7 @@ public class TestRPC extends TestCase {
     server.stop();    
   }
 
+  @Test
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     // create a server with two handlers
@@ -273,11 +363,12 @@ public class TestRPC extends TestCase {
     }
   }
   
-  public void testRPCConf(Configuration conf) throws Exception {
-    
+  @Test
+  public void testCalls() throws Exception {
+    testCallsInternal(conf);
   }
-
-  public void testCalls(Configuration conf) throws Exception {
+  
+  private void testCallsInternal(Configuration conf) throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
@@ -384,6 +475,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testStandaloneClient() throws IOException {
     try {
       TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
@@ -450,6 +542,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -481,20 +574,48 @@ public class TestRPC extends TestCase {
     Configuration conf = new Configuration();
     
     conf.setBoolean("ipc.client.ping", false);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
     
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
   }
 
   /**
    * Test stopping a non-registered proxy
    * @throws Exception
    */
+  @Test
   public void testStopNonRegisteredProxy() throws Exception {
     RPC.stopProxy(mock(TestProtocol.class));
   }
   
+  @Test
+  public void testStopProxy() throws IOException {
+    StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
+  public void testWrappedStopProxy() throws IOException {
+    StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(wrappedProxy);
+    
+    StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
+        wrappedProxy, RetryPolicies.RETRY_FOREVER);
+    
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
   public void testErrorMsgForInsecureClient() throws Exception {
     final Server server = RPC.getServer(TestProtocol.class,
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -567,10 +688,10 @@ public class TestRPC extends TestCase {
     return count;
   }
 
-
   /**
    * Test that server.stop() properly stops all threads
    */
+  @Test
   public void testStopsAllThreads() throws Exception {
     int threadsBefore = countThreads("Server$Listener$Reader");
     assertEquals("Expect no Reader threads running before test",
@@ -591,8 +712,7 @@ public class TestRPC extends TestCase {
   }
   
   public static void main(String[] args) throws Exception {
-
-    new TestRPC("test").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
 
   }
 }

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+
+public class TestRPCCallBenchmark {
+
+  @Test(timeout=20000)
+  public void testBenchmarkWithWritable() throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(),
+        new String[] {
+      "--clientThreads", "30",
+      "--serverThreads", "30",
+      "--time", "5",
+      "--serverReaderThreads", "4",
+      "--messageSize", "1024",
+      "--engine", "writable"});
+    assertEquals(0, rc);
+  }
+  
+  @Test(timeout=20000)
+  public void testBenchmarkWithProto() throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(),
+        new String[] {
+      "--clientThreads", "30",
+      "--serverThreads", "30",
+      "--time", "5",
+      "--serverReaderThreads", "4",
+      "--messageSize", "1024",
+      "--engine", "protobuf"});
+    assertEquals(0, rc);
+  }
+}

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Sun Feb 26 23:32:06 2012
@@ -31,6 +31,10 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -39,7 +43,7 @@ import org.junit.Test;
 public class TestRPCCompatibility {
   private static final String ADDRESS = "0.0.0.0";
   private static InetSocketAddress addr;
-  private static Server server;
+  private static RPC.Server server;
   private ProtocolProxy<?> proxy;
 
   public static final Log LOG =
@@ -52,10 +56,14 @@ public class TestRPCCompatibility {
     void ping() throws IOException;    
   }
   
-  public interface TestProtocol1 extends TestProtocol0 {
+  public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
     String echo(String value) throws IOException;
   }
 
+  
+  // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol2 extends TestProtocol1 {
     int echo(int value)  throws IOException;
   }
@@ -89,28 +97,44 @@ public class TestRPCCompatibility {
   public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
     @Override
     public String echo(String value) { return value; }
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+        return TestProtocol1.versionID;
+    }
   }
 
   public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
     @Override
     public int echo(int value) { return value; }
+
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+      return TestProtocol2.versionID;
+    }
+
   }
   
   @After
   public void tearDown() throws IOException {
     if (proxy != null) {
       RPC.stopProxy(proxy.getProxy());
+      proxy = null;
     }
     if (server != null) {
       server.stop();
+      server = null;
     }
   }
   
   @Test  // old client vs new server
   public void testVersion0ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                            impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -154,8 +178,10 @@ public class TestRPCCompatibility {
     
     public int echo(int value) throws IOException, NumberFormatException {
       if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
         return -value;  // use version 3 echo long
       } else { // server is version 2
+System.out.println("echo int is NOT supported");
         return Integer.parseInt(proxy2.echo(String.valueOf(value)));
       }
     }
@@ -172,8 +198,10 @@ public class TestRPCCompatibility {
   @Test // Compatible new client & old server
   public void testVersion2ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                              impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -189,9 +217,12 @@ public class TestRPCCompatibility {
   
   @Test // equal version client and server
   public void testVersion2ClientVersion2Server() throws Exception {
+    ProtocolSignature.resetCache();
     // create a server with two handlers
+    TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
-                              new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+                             impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -250,14 +281,16 @@ public class TestRPCCompatibility {
     assertEquals(hash1, hash2);
   }
   
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol4 extends TestProtocol2 {
-    public static final long versionID = 1L;
+    public static final long versionID = 4L;
     int echo(int value)  throws IOException;
   }
   
   @Test
   public void testVersionMismatch() throws IOException {
-    server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
         false, conf, null);
     server.start();
     addr = NetUtils.getConnectAddress(server);
@@ -268,7 +301,76 @@ public class TestRPCCompatibility {
       proxy.echo(21);
       fail("The call must throw VersionMismatch exception");
     } catch (IOException ex) {
-      Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+      Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), 
+          ex.getMessage().contains("VersionMismatch"));
+    }
+  }
+  
+  @Test
+  public void testIsMethodSupported() throws IOException {
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
+        false, conf, null);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+        TestProtocol2.versionID, addr, conf);
+    boolean supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RpcKind.RPC_WRITABLE,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertTrue(supported);
+    supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertFalse(supported);
+  }
+
+  /**
+   * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+   * the server registry to extract protocol signatures and versions.
+   */
+  @Test
+  public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+    TestImpl1 impl = new TestImpl1();
+    server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
+        conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+
+    ProtocolMetaInfoServerSideTranslatorPB xlator = 
+        new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+    GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RpcKind.RPC_PROTOCOL_BUFFER));
+    //No signatures should be found
+    Assert.assertEquals(0, resp.getProtocolSignatureCount());
+    resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RpcKind.RPC_WRITABLE));
+    Assert.assertEquals(1, resp.getProtocolSignatureCount());
+    ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+    Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+    boolean found = false;
+    int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+        .getMethod("echo", String.class));
+    for (int m : sig.getMethodsList()) {
+      if (expected == m) {
+        found = true;
+        break;
+      }
     }
+    Assert.assertTrue(found);
+  }
+  
+  private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+      Class<?> protocol, RpcKind rpcKind) {
+    GetProtocolSignatureRequestProto.Builder builder = 
+        GetProtocolSignatureRequestProto.newBuilder();
+    builder.setProtocol(protocol.getName());
+    builder.setRpcKind(rpcKind.toString());
+    return builder.build();
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Sun Feb 26 23:32:06 2012
@@ -164,6 +164,10 @@ public abstract class MultithreadedTestU
       }
       checkException();
     }
+
+    public Iterable<? extends Thread> getTestThreads() {
+      return testThreads;
+    }
   }
 
   /**

Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java&r1=1293950&r2=1293964&rev=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test.proto Sun Feb 26 23:32:06 2012
@@ -16,24 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.security.token.block;
+option java_package = "org.apache.hadoop.ipc.protobuf";
+option java_outer_classname = "TestProtos";
+option java_generate_equals_and_hash = true;
 
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+message EmptyRequestProto {
+}
 
-/**
- * Key used for generating and verifying block tokens
- */
-@InterfaceAudience.Private
-public class BlockKey extends DelegationKey {
+message EmptyResponseProto {
+}
 
-  public BlockKey() {
-    super();
-  }
+message EchoRequestProto {
+  required string message = 1;
+}
 
-  public BlockKey(int keyId, long expiryDate, SecretKey key) {
-    super(keyId, expiryDate, key);
-  }
+message EchoResponseProto {
+  required string message = 1;
 }

Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java&r1=1293950&r2=1293964&rev=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto Sun Feb 26 23:32:06 2012
@@ -15,25 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+option java_package = "org.apache.hadoop.ipc.protobuf";
+option java_outer_classname = "TestRpcServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
 
-package org.apache.hadoop.hdfs.security.token.block;
+import "test.proto";
 
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 /**
- * Key used for generating and verifying block tokens
+ * A protobuf service for use in tests
  */
-@InterfaceAudience.Private
-public class BlockKey extends DelegationKey {
-
-  public BlockKey() {
-    super();
-  }
+service TestProtobufRpcProto {
+  rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc echo(EchoRequestProto) returns (EchoResponseProto);
+  rpc error(EmptyRequestProto) returns (EmptyResponseProto);
+}
 
-  public BlockKey(int keyId, long expiryDate, SecretKey key) {
-    super(keyId, expiryDate, key);
-  }
+service TestProtobufRpc2Proto {
+  rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc echo2(EchoRequestProto) returns (EchoResponseProto);
 }



Mime
View raw message