Author: todd
Date: Tue Feb 14 19:29:42 2012
New Revision: 1244185
URL: http://svn.apache.org/viewvc?rev=1244185&view=rev
Log:
HADOOP-8070. Add a standalone benchmark for RPC call performance. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1244185&r1=1244184&r2=1244185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Tue Feb 14 19:29:42
2012
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
HADOOP-7988. Upper case in hostname part of the principals doesn't work with
kerberos. (jitendra)
+ HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd)
+
BUG FIXES
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1244185&r1=1244184&r2=1244185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Tue Feb 14 19:29:42 2012
@@ -42,6 +42,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -92,6 +93,8 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@@ -318,6 +321,12 @@ public abstract class Server {
return rpcMetrics;
}
+
+ @VisibleForTesting
+ Iterable<? extends Thread> getHandlers() {
+ return Arrays.asList(handlers);
+ }
+
/**
* Refresh the service authorization ACL for the service handled by this server.
*/
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java?rev=1244185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
(added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
Tue Feb 14 19:29:42 2012
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
+import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
+
+/**
+ * Benchmark for protobuf RPC.
+ * Run with --help option for usage.
+ */
+public class RPCCallBenchmark implements Tool, Configurable {
+ private Configuration conf;
+ private AtomicLong callCount = new AtomicLong(0);
+ private static ThreadMXBean threadBean =
+ ManagementFactory.getThreadMXBean();
+
+ private static class MyOptions {
+ private boolean failed = false;
+ private int serverThreads = 0;
+ private int serverReaderThreads = 1;
+ private int clientThreads = 0;
+ private String host = "0.0.0.0";
+ private int port = 12345;
+ public int secondsToRun = 15;
+ private int msgSize = 1024;
+ public Class<? extends RpcEngine> rpcEngine =
+ WritableRpcEngine.class;
+
+ private MyOptions(String args[]) {
+ try {
+ Options opts = buildOptions();
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = parser.parse(opts, args, true);
+ processOptions(line, opts);
+ validateOptions();
+ } catch (ParseException e) {
+ System.err.println(e.getMessage());
+ System.err.println("Try \"--help\" option for details.");
+ failed = true;
+ }
+ }
+
+ private void validateOptions() throws ParseException {
+ if (serverThreads <= 0 && clientThreads <= 0) {
+ throw new ParseException("Must specify at least -c or -s");
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ private Options buildOptions() {
+ Options opts = new Options();
+ opts.addOption(
+ OptionBuilder.withLongOpt("serverThreads").hasArg(true)
+ .withArgName("numthreads")
+ .withDescription("number of server threads (handlers) to run (or 0 to not run server)")
+ .create("s"));
+ opts.addOption(
+ OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
+ .withArgName("threads")
+ .withDescription("number of server reader threads to run")
+ .create("r"));
+
+
+ opts.addOption(
+ OptionBuilder.withLongOpt("clientThreads").hasArg(true)
+ .withArgName("numthreads")
+ .withDescription("number of client threads to run (or 0 to not run client)")
+ .create("c"));
+
+ opts.addOption(
+ OptionBuilder.withLongOpt("messageSize").hasArg(true)
+ .withArgName("bytes")
+ .withDescription("size of call parameter in bytes")
+ .create("m"));
+
+ opts.addOption(
+ OptionBuilder.withLongOpt("time").hasArg(true)
+ .withArgName("seconds")
+ .withDescription("number of seconds to run clients for")
+ .create("t"));
+ opts.addOption(
+ OptionBuilder.withLongOpt("port").hasArg(true)
+ .withArgName("port")
+ .withDescription("port to listen or connect on")
+ .create("p"));
+ opts.addOption(
+ OptionBuilder.withLongOpt("host").hasArg(true)
+ .withArgName("addr")
+ .withDescription("host to listen or connect on")
+ .create('h'));
+
+ opts.addOption(
+ OptionBuilder.withLongOpt("engine").hasArg(true)
+ .withArgName("writable|protobuf")
+ .withDescription("engine to use")
+ .create('e'));
+
+ opts.addOption(
+ OptionBuilder.withLongOpt("help").hasArg(false)
+ .withDescription("show this screen")
+ .create('?'));
+
+ return opts;
+ }
+
+ private void processOptions(CommandLine line, Options opts)
+ throws ParseException {
+ if (line.hasOption("help") || line.hasOption('?')) {
+ HelpFormatter formatter = new HelpFormatter();
+ System.out.println("Protobuf IPC benchmark.");
+ System.out.println();
+ formatter.printHelp(100,
+ "java ... PBRPCBenchmark [options]",
+ "\nSupported options:", opts, "");
+ return;
+ }
+
+ if (line.hasOption('s')) {
+ serverThreads = Integer.parseInt(line.getOptionValue('s'));
+ }
+ if (line.hasOption('r')) {
+ serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
+ }
+ if (line.hasOption('c')) {
+ clientThreads = Integer.parseInt(line.getOptionValue('c'));
+ }
+ if (line.hasOption('t')) {
+ secondsToRun = Integer.parseInt(line.getOptionValue('t'));
+ }
+ if (line.hasOption('m')) {
+ msgSize = Integer.parseInt(line.getOptionValue('m'));
+ }
+ if (line.hasOption('p')) {
+ port = Integer.parseInt(line.getOptionValue('p'));
+ }
+ if (line.hasOption('h')) {
+ host = line.getOptionValue('h');
+ }
+ if (line.hasOption('e')) {
+ String eng = line.getOptionValue('e');
+ if ("protobuf".equals(eng)) {
+ rpcEngine = ProtobufRpcEngine.class;
+ } else if ("writable".equals(eng)) {
+ rpcEngine = WritableRpcEngine.class;
+ } else {
+ throw new ParseException("invalid engine: " + eng);
+ }
+ }
+
+ String[] remainingArgs = line.getArgs();
+ if (remainingArgs.length != 0) {
+ throw new ParseException("Extra arguments: " +
+ Joiner.on(" ").join(remainingArgs));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
+ + "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
+ + clientThreads + "\nhost=" + host + "\nport=" + port
+ + "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
+ }
+ }
+
+
+
+ private Server startServer(MyOptions opts) throws IOException {
+ if (opts.serverThreads <= 0) {
+ return null;
+ }
+ conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+ opts.serverReaderThreads);
+
+ RPC.Server server;
+ // Get RPC server for server side implementation
+ if (opts.rpcEngine == ProtobufRpcEngine.class) {
+ // Create server side implementation
+ PBServerImpl serverImpl = new PBServerImpl();
+ BlockingService service = TestProtobufRpcProto
+ .newReflectiveBlockingService(serverImpl);
+
+ server = RPC.getServer(TestRpcService.class, service,
+ opts.host, opts.port, opts.serverThreads, false, conf, null);
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
+ opts.host, opts.port, opts.serverThreads, false, conf, null);
+ } else {
+ throw new RuntimeException("Bad engine: " + opts.rpcEngine);
+ }
+ server.start();
+ return server;
+ }
+
+ private long getTotalCpuTime(Iterable<? extends Thread> threads) {
+ long total = 0;
+ for (Thread t : threads) {
+ long tid = t.getId();
+ total += threadBean.getThreadCpuTime(tid);
+ }
+ return total;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ MyOptions opts = new MyOptions(args);
+ if (opts.failed) {
+ return -1;
+ }
+
+ // Set RPC engine to the configured RPC engine
+ RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
+
+ Server server = startServer(opts);
+ try {
+
+ TestContext ctx = setupClientTestContext(opts);
+ if (ctx != null) {
+ long totalCalls = 0;
+ ctx.startThreads();
+ long veryStart = System.nanoTime();
+
+ // Loop printing results every second until the specified
+ // time has elapsed
+ for (int i = 0; i < opts.secondsToRun ; i++) {
+ long st = System.nanoTime();
+ ctx.waitFor(1000);
+ long et = System.nanoTime();
+ long ct = callCount.getAndSet(0);
+ totalCalls += ct;
+ double callsPerSec = (ct * 1000000000)/(et - st);
+ System.out.println("Calls per second: " + callsPerSec);
+ }
+
+ // Print results
+
+ if (totalCalls > 0) {
+ long veryEnd = System.nanoTime();
+ double callsPerSec =
+ (totalCalls * 1000000000)/(veryEnd - veryStart);
+ long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
+ long cpuNanosServer = -1;
+ if (server != null) {
+ cpuNanosServer = getTotalCpuTime(server.getHandlers());;
+ }
+ System.out.println("====== Results ======");
+ System.out.println("Options:\n" + opts);
+ System.out.println("Total calls per second: " + callsPerSec);
+ System.out.println("CPU time per call on client: " +
+ (cpuNanosClient / totalCalls) + " ns");
+ if (server != null) {
+ System.out.println("CPU time per call on server: " +
+ (cpuNanosServer / totalCalls) + " ns");
+ }
+ } else {
+ System.out.println("No calls!");
+ }
+
+ ctx.stop();
+ } else {
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+ } finally {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ return 0;
+ }
+
+
+ private TestContext setupClientTestContext(final MyOptions opts)
+ throws IOException, InterruptedException {
+ if (opts.clientThreads <= 0) {
+ return null;
+ }
+
+ // Set up a separate proxy for each client thread,
+ // rather than making them share TCP pipes.
+ int numProxies = opts.clientThreads;
+ final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
+ for (int i = 0; i < numProxies; i++) {
+ proxies[i] =
+ UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
+ .doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
+ @Override
+ public RpcServiceWrapper run() throws Exception {
+ return createRpcClient(opts);
+ }
+ });
+ }
+
+ // Create an echo message of the desired length
+ final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
+ for (int c = 0; c < opts.msgSize; c++) {
+ msgBuilder.append('x');
+ }
+ final String echoMessage = msgBuilder.toString();
+
+ // Create the clients in a test context
+ TestContext ctx = new TestContext();
+ for (int i = 0; i < opts.clientThreads; i++) {
+ final RpcServiceWrapper proxy = proxies[i % numProxies];
+
+ ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+ @Override
+ public void doAnAction() throws Exception {
+ proxy.doEcho(echoMessage);
+ callCount.incrementAndGet();
+ }
+ });
+ }
+ return ctx;
+ }
+
+ /**
+ * Simple interface that can be implemented either by the
+ * protobuf or writable implementations.
+ */
+ private interface RpcServiceWrapper {
+ public String doEcho(String msg) throws Exception;
+ }
+
+ /**
+ * Create a client proxy for the specified engine.
+ */
+ private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
+ InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
+
+ if (opts.rpcEngine == ProtobufRpcEngine.class) {
+ final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
+ return new RpcServiceWrapper() {
+ @Override
+ public String doEcho(String msg) throws Exception {
+ EchoRequestProto req = EchoRequestProto.newBuilder()
+ .setMessage(msg)
+ .build();
+ EchoResponseProto responseProto = proxy.echo(null, req);
+ return responseProto.getMessage();
+ }
+ };
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ final TestProtocol proxy = (TestProtocol)RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+ return new RpcServiceWrapper() {
+ @Override
+ public String doEcho(String msg) throws Exception {
+ return proxy.echo(msg);
+ }
+ };
+ } else {
+ throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
+ }
+ }
+
+ public static void main(String []args) throws Exception {
+ int rc = ToolRunner.run(new RPCCallBenchmark(), args);
+ System.exit(rc);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java?rev=1244185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
(added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
Tue Feb 14 19:29:42 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+
+public class TestRPCCallBenchmark {
+
+ @Test(timeout=20000)
+ public void testBenchmarkWithWritable() throws Exception {
+ int rc = ToolRunner.run(new RPCCallBenchmark(),
+ new String[] {
+ "--clientThreads", "30",
+ "--serverThreads", "30",
+ "--time", "5",
+ "--serverReaderThreads", "4",
+ "--messageSize", "1024",
+ "--engine", "writable"});
+ assertEquals(0, rc);
+ }
+
+ @Test(timeout=20000)
+ public void testBenchmarkWithProto() throws Exception {
+ int rc = ToolRunner.run(new RPCCallBenchmark(),
+ new String[] {
+ "--clientThreads", "30",
+ "--serverThreads", "30",
+ "--time", "5",
+ "--serverReaderThreads", "4",
+ "--messageSize", "1024",
+ "--engine", "protobuf"});
+ assertEquals(0, rc);
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1244185&r1=1244184&r2=1244185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
Tue Feb 14 19:29:42 2012
@@ -164,6 +164,10 @@ public abstract class MultithreadedTestU
}
checkException();
}
+
+ public Iterable<? extends Thread> getTestThreads() {
+ return testThreads;
+ }
}
/**
|