Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 125E7C18C for ; Mon, 2 Jul 2012 22:16:13 +0000 (UTC) Received: (qmail 86600 invoked by uid 500); 2 Jul 2012 22:16:12 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 86549 invoked by uid 500); 2 Jul 2012 22:16:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 86542 invoked by uid 99); 2 Jul 2012 22:16:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2012 22:16:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2012 22:16:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B58172388860; Mon, 2 Jul 2012 22:15:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1356504 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/ Date: Mon, 02 Jul 2012 22:15:47 -0000 To: common-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120702221550.B58172388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Mon Jul 2 22:15:44 2012 New Revision: 1356504 URL: http://svn.apache.org/viewvc?rev=1356504&view=rev Log: HADOOP-8533. Remove parallel call ununsed capability in RPC. Contributed by Brandon Li. Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Jul 2 22:15:44 2012 @@ -82,6 +82,9 @@ Trunk (unreleased changes) HADOOP-8059. Add javadoc to InterfaceAudience and InterfaceStability. (Brandon Li via suresh) + HADOOP-8533. Remove parallel call ununsed capability in RPC. + (Brandon Li via suresh) + BUG FIXES HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName. Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Jul 2 22:15:44 2012 @@ -971,43 +971,6 @@ public class Client { } } - /** Call implementation used for parallel calls. */ - private class ParallelCall extends Call { - private ParallelResults results; - private int index; - - public ParallelCall(Writable param, ParallelResults results, int index) { - super(RPC.RpcKind.RPC_WRITABLE, param); - this.results = results; - this.index = index; - } - - /** Deliver result to result collector. */ - protected void callComplete() { - results.callComplete(this); - } - } - - /** Result collector for parallel calls. */ - private static class ParallelResults { - private Writable[] values; - private int size; - private int count; - - public ParallelResults(int size) { - this.values = new Writable[size]; - this.size = size; - } - - /** Collect a result. */ - public synchronized void callComplete(ParallelCall call) { - values[call.index] = call.getRpcResult(); // store the value - count++; // count it - if (count == size) // if all values are in - notify(); // then notify waiting caller - } - } - /** Construct an IPC client whose values are of the given {@link Writable} * class. */ public Client(Class valueClass, Configuration conf, @@ -1209,63 +1172,6 @@ public class Client { } } - /** - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], - * Class, UserGroupInformation, Configuration)} instead - */ - @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses) - throws IOException, InterruptedException { - return call(params, addresses, null, null, conf); - } - - /** - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], - * Class, UserGroupInformation, Configuration)} instead - */ - @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses, - Class protocol, UserGroupInformation ticket) - throws IOException, InterruptedException { - return call(params, addresses, protocol, ticket, conf); - } - - - /** Makes a set of calls in parallel. Each parameter is sent to the - * corresponding address. When all values are available, or have timed out - * or errored, the collected results are returned in an array. The array - * contains nulls for calls that timed out or errored. */ - public Writable[] call(Writable[] params, InetSocketAddress[] addresses, - Class protocol, UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - if (addresses.length == 0) return new Writable[0]; - - ParallelResults results = new ParallelResults(params.length); - synchronized (results) { - for (int i = 0; i < params.length; i++) { - ParallelCall call = new ParallelCall(params[i], results, i); - try { - ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i], - protocol, ticket, 0, conf); - Connection connection = getConnection(remoteId, call); - connection.sendParam(call); // send each parameter - } catch (IOException e) { - // log errors - LOG.info("Calling "+addresses[i]+" caught: " + - e.getMessage(),e); - results.size--; // wait for one fewer result - } - } - while (results.count != results.size) { - try { - results.wait(); // wait for all results - } catch (InterruptedException e) {} - } - - return results.values; - } - } - // for unit testing only @InterfaceAudience.Private @InterfaceStability.Unstable Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Jul 2 22:15:44 2012 @@ -244,12 +244,6 @@ public class ProtobufRpcEngine implement } } - @Override - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) { - throw new UnsupportedOperationException(); - } - /** * Writable Wrapper for Protocol Buffer Requests */ Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Jul 2 22:15:44 2012 @@ -21,7 +21,6 @@ package org.apache.hadoop.ipc; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; -import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -627,27 +626,6 @@ public class RPC { + proxy.getClass()); } - /** - * Expert: Make multiple, parallel calls to a set of servers. - * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead - */ - @Deprecated - public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, Configuration conf) - throws IOException, InterruptedException { - return call(method, params, addrs, null, conf); - } - - /** Expert: Make multiple, parallel calls to a set of servers. */ - public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - - return getProtocolEngine(method.getDeclaringClass(), conf) - .call(method, params, addrs, ticket, conf); - } - /** Construct a server for a protocol implementation instance listening on a * port and address. * @deprecated protocol interface should be passed. Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java Mon Jul 2 22:15:44 2012 @@ -19,7 +19,6 @@ package org.apache.hadoop.ipc; import java.io.IOException; -import java.lang.reflect.Method; import java.net.InetSocketAddress; import javax.net.SocketFactory; @@ -44,11 +43,6 @@ public interface RpcEngine { SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException; - /** Expert: Make multiple, parallel calls to a set of servers. */ - Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException; - /** * Construct a server for a protocol implementation instance. * Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Jul 2 22:15:44 2012 @@ -20,7 +20,6 @@ package org.apache.hadoop.ipc; import java.lang.reflect.Proxy; import java.lang.reflect.Method; -import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; @@ -274,36 +273,6 @@ public class WritableRpcEngine implement return new ProtocolProxy(protocol, proxy, true); } - /** Expert: Make multiple, parallel calls to a set of servers. */ - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - - Invocation[] invocations = new Invocation[params.length]; - for (int i = 0; i < params.length; i++) - invocations[i] = new Invocation(method, params[i]); - Client client = CLIENTS.getClient(conf); - try { - Writable[] wrappedValues = - client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf); - - if (method.getReturnType() == Void.TYPE) { - return null; - } - - Object[] values = - (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); - for (int i = 0; i < values.length; i++) - if (wrappedValues[i] != null) - values[i] = ((ObjectWritable)wrappedValues[i]).get(); - - return values; - } finally { - CLIENTS.stopClient(client); - } - } - /* Construct a server for a protocol implementation instance listening on a * port and address. */ @Override Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Jul 2 22:15:44 2012 @@ -149,41 +149,6 @@ public class TestIPC { } } - private static class ParallelCaller extends Thread { - private Client client; - private int count; - private InetSocketAddress[] addresses; - private boolean failed; - - public ParallelCaller(Client client, InetSocketAddress[] addresses, - int count) { - this.client = client; - this.addresses = addresses; - this.count = count; - } - - public void run() { - for (int i = 0; i < count; i++) { - try { - Writable[] params = new Writable[addresses.length]; - for (int j = 0; j < addresses.length; j++) - params[j] = new LongWritable(RANDOM.nextLong()); - Writable[] values = client.call(params, addresses, null, null, conf); - for (int j = 0; j < addresses.length; j++) { - if (!params[j].equals(values[j])) { - LOG.fatal("Call failed!"); - failed = true; - break; - } - } - } catch (Exception e) { - LOG.fatal("Caught: " + StringUtils.stringifyException(e)); - failed = true; - } - } - } - } - @Test public void testSerial() throws Exception { testSerial(3, false, 2, 5, 100); @@ -218,51 +183,7 @@ public class TestIPC { } @Test - public void testParallel() throws Exception { - testParallel(10, false, 2, 4, 2, 4, 100); - } - - public void testParallel(int handlerCount, boolean handlerSleep, - int serverCount, int addressCount, - int clientCount, int callerCount, int callCount) - throws Exception { - Server[] servers = new Server[serverCount]; - for (int i = 0; i < serverCount; i++) { - servers[i] = new TestServer(handlerCount, handlerSleep); - servers[i].start(); - } - - InetSocketAddress[] addresses = new InetSocketAddress[addressCount]; - for (int i = 0; i < addressCount; i++) { - addresses[i] = NetUtils.getConnectAddress(servers[i%serverCount]); - } - - Client[] clients = new Client[clientCount]; - for (int i = 0; i < clientCount; i++) { - clients[i] = new Client(LongWritable.class, conf); - } - - ParallelCaller[] callers = new ParallelCaller[callerCount]; - for (int i = 0; i < callerCount; i++) { - callers[i] = - new ParallelCaller(clients[i%clientCount], addresses, callCount); - callers[i].start(); - } - for (int i = 0; i < callerCount; i++) { - callers[i].join(); - assertFalse(callers[i].failed); - } - for (int i = 0; i < clientCount; i++) { - clients[i].stop(); - } - for (int i = 0; i < serverCount; i++) { - servers[i].stop(); - } - } - - @Test public void testStandAloneClient() throws Exception { - testParallel(10, false, 2, 4, 2, 4, 100); Client client = new Client(LongWritable.class, conf); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { @@ -781,13 +702,4 @@ public class TestIPC { Ints.toByteArray(HADOOP0_21_ERROR_MSG.length()), HADOOP0_21_ERROR_MSG.getBytes()); } - - public static void main(String[] args) throws Exception { - - //new TestIPC().testSerial(5, false, 2, 10, 1000); - - new TestIPC().testParallel(10, false, 2, 4, 2, 4, 1000); - - } - } Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1356504&r1=1356503&r2=1356504&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Jul 2 22:15:44 2012 @@ -244,13 +244,6 @@ public class TestRPC { */ private static class StoppedRpcEngine implements RpcEngine { - @Override - public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, - UserGroupInformation ticket, Configuration conf) - throws IOException, InterruptedException { - return null; - } - @SuppressWarnings("unchecked") @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, @@ -491,17 +484,6 @@ public class TestRPC { } } - // try some multi-calls - Method echo = - TestProtocol.class.getMethod("echo", new Class[] { String.class }); - String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}}, - new InetSocketAddress[] {addr, addr}, conf); - assertTrue(Arrays.equals(strings, new String[]{"a","b"})); - - Method ping = TestProtocol.class.getMethod("ping", new Class[] {}); - Object[] voids = RPC.call(ping, new Object[][]{{},{}}, - new InetSocketAddress[] {addr, addr}, conf); - assertEquals(voids, null); } finally { server.stop(); if(proxy!=null) RPC.stopProxy(proxy);