Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8D660218D for ; Tue, 3 May 2011 22:16:41 +0000 (UTC) Received: (qmail 179 invoked by uid 500); 3 May 2011 22:16:41 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 150 invoked by uid 500); 3 May 2011 22:16:41 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 141 invoked by uid 99); 3 May 2011 22:16:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 22:16:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 22:16:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0334623888CF; Tue, 3 May 2011 22:16:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1099284 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/ Date: Tue, 03 May 2011 22:16:14 -0000 To: common-commits@hadoop.apache.org From: jitendra@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110503221615.0334623888CF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jitendra Date: Tue May 3 22:16:14 2011 New Revision: 1099284 URL: http://svn.apache.org/viewvc?rev=1099284&view=rev Log: HADOOP-7227. Remove protocol version check at proxy creation in Hadoop RPC. Contributed by jitendra. Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Tue May 3 22:16:14 2011 @@ -105,6 +105,9 @@ Trunk (unreleased changes) HADOOP-7179. Federation: Improve HDFS startup scripts. (Erik Steffl and Tanping Wang via suresh) + HADOOP-7227. Remove protocol version check at proxy creation in Hadoop + RPC. (jitendra) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Tue May 3 22:16:14 2011 @@ -61,6 +61,8 @@ public class AvroRpcEngine implements Rp /** Tunnel an Avro RPC request and response through Hadoop's RPC. */ private static interface TunnelProtocol extends VersionedProtocol { + //WritableRpcEngine expects a versionID in every protocol. + public static final long versionID = 0L; /** All Avro methods and responses go through this. */ BufferListWritable call(BufferListWritable request) throws IOException; } @@ -147,7 +149,7 @@ public class AvroRpcEngine implements Rp protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)), - null); + false); } /** Stop this proxy. */ Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Tue May 3 22:16:14 2011 @@ -19,6 +19,7 @@ package org.apache.hadoop.ipc; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.HashSet; @@ -34,24 +35,55 @@ public class ProtocolProxy { private Class protocol; private T proxy; private HashSet serverMethods = null; + final private boolean supportServerMethodCheck; + private boolean serverMethodsFetched = false; /** * Constructor * * @param protocol protocol class * @param proxy its proxy - * @param serverMethods a list of hash codes of the methods that it supports - * @throws ClassNotFoundException + * @param supportServerMethodCheck If false proxy will never fetch server + * methods and isMethodSupported will always return true. If true, + * server methods will be fetched for the first call to + * isMethodSupported. */ - public ProtocolProxy(Class protocol, T proxy, int[] serverMethods) { + public ProtocolProxy(Class protocol, T proxy, + boolean supportServerMethodCheck) { this.protocol = protocol; this.proxy = proxy; - if (serverMethods != null) { - this.serverMethods = new HashSet(serverMethods.length); - for (int method : serverMethods) { - this.serverMethods.add(Integer.valueOf(method)); + this.supportServerMethodCheck = supportServerMethodCheck; + } + + private void fetchServerMethods(Method method) throws IOException { + long clientVersion; + try { + Field versionField = method.getDeclaringClass().getField("versionID"); + versionField.setAccessible(true); + clientVersion = versionField.getLong(method.getDeclaringClass()); + } catch (NoSuchFieldException ex) { + throw new RuntimeException(ex); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + int clientMethodsHash = ProtocolSignature.getFingerprint(method + .getDeclaringClass().getMethods()); + ProtocolSignature serverInfo = ((VersionedProtocol) proxy) + .getProtocolSignature(protocol.getName(), clientVersion, + clientMethodsHash); + long serverVersion = serverInfo.getVersion(); + if (serverVersion != clientVersion) { + throw new RPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + int[] serverMethodsCodes = serverInfo.getMethods(); + if (serverMethodsCodes != null) { + serverMethods = new HashSet(serverMethodsCodes.length); + for (int m : serverMethodsCodes) { + this.serverMethods.add(Integer.valueOf(m)); } } + serverMethodsFetched = true; } /* @@ -68,10 +100,10 @@ public class ProtocolProxy { * @param parameterTypes a method's parameter types * @return true if the method is supported by the server */ - public boolean isMethodSupported(String methodName, + public synchronized boolean isMethodSupported(String methodName, Class... parameterTypes) throws IOException { - if (serverMethods == null) { // client & server have the same protocol + if (!supportServerMethodCheck) { return true; } Method method; @@ -82,6 +114,12 @@ public class ProtocolProxy { } catch (NoSuchMethodException e) { throw new IOException(e); } + if (!serverMethodsFetched) { + fetchServerMethods(method); + } + if (serverMethods == null) { // client & server have the same protocol + return true; + } return serverMethods.contains( Integer.valueOf(ProtocolSignature.getFingerprint(method))); } Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue May 3 22:16:14 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.lang.reflect.Method; import java.lang.reflect.Array; @@ -46,6 +47,10 @@ import org.apache.hadoop.metrics.util.Me @InterfaceStability.Evolving public class WritableRpcEngine implements RpcEngine { private static final Log LOG = LogFactory.getLog(RPC.class); + + //writableRpcVersion should be updated if there is a change + //in format of the rpc messages. + public static long writableRpcVersion = 1L; /** A method invocation, including the method name and its parameters.*/ private static class Invocation implements Writable, Configurable { @@ -53,6 +58,12 @@ public class WritableRpcEngine implement private Class[] parameterClasses; private Object[] parameters; private Configuration conf; + private long clientVersion; + private int clientMethodsHash; + + //This could be different from static writableRpcVersion when received + //at server, if client is using a different version. + private long rpcVersion; public Invocation() {} @@ -60,6 +71,24 @@ public class WritableRpcEngine implement this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; + rpcVersion = writableRpcVersion; + if (method.getDeclaringClass().equals(VersionedProtocol.class)) { + //VersionedProtocol is exempted from version check. + clientVersion = 0; + clientMethodsHash = 0; + } else { + try { + Field versionField = method.getDeclaringClass().getField("versionID"); + versionField.setAccessible(true); + this.clientVersion = versionField.getLong(method.getDeclaringClass()); + } catch (NoSuchFieldException ex) { + throw new RuntimeException(ex); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + this.clientMethodsHash = ProtocolSignature.getFingerprint(method + .getDeclaringClass().getMethods()); + } } /** The name of the method invoked. */ @@ -70,9 +99,28 @@ public class WritableRpcEngine implement /** The parameter instances. */ public Object[] getParameters() { return parameters; } + + private long getProtocolVersion() { + return clientVersion; + } + + private int getClientMethodsHash() { + return clientMethodsHash; + } + + /** + * Returns the rpc version used by the client. + * @return rpcVersion + */ + public long getRpcVersion() { + return rpcVersion; + } public void readFields(DataInput in) throws IOException { + rpcVersion = in.readLong(); methodName = UTF8.readString(in); + clientVersion = in.readLong(); + clientMethodsHash = in.readInt(); parameters = new Object[in.readInt()]; parameterClasses = new Class[parameters.length]; ObjectWritable objectWritable = new ObjectWritable(); @@ -83,7 +131,10 @@ public class WritableRpcEngine implement } public void write(DataOutput out) throws IOException { + out.writeLong(rpcVersion); UTF8.writeString(out, methodName); + out.writeLong(clientVersion); + out.writeInt(clientMethodsHash); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], @@ -101,6 +152,9 @@ public class WritableRpcEngine implement buffer.append(parameters[i]); } buffer.append(")"); + buffer.append(", rpc version="+rpcVersion); + buffer.append(", client version="+clientVersion); + buffer.append(", methodsFingerPrint="+clientMethodsHash); return buffer.toString(); } @@ -230,22 +284,10 @@ public class WritableRpcEngine implement int rpcTimeout) throws IOException { - T proxy = (T)Proxy.newProxyInstance - (protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); - int[] serverMethods = null; - if (proxy instanceof VersionedProtocol) { - ProtocolSignature serverInfo = ((VersionedProtocol)proxy) - .getProtocolSignature(protocol.getName(), clientVersion, - ProtocolSignature.getFingerprint(protocol.getMethods())); - long serverVersion = serverInfo.getVersion(); - if (serverVersion != clientVersion) { - throw new RPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } - serverMethods = serverInfo.getMethods(); - } - return new ProtocolProxy(protocol, proxy, serverMethods); + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, + factory, rpcTimeout)); + return new ProtocolProxy(protocol, proxy, true); } /** @@ -353,6 +395,31 @@ public class WritableRpcEngine implement call.getParameterClasses()); method.setAccessible(true); + // Verify rpc version + if (call.getRpcVersion() != writableRpcVersion) { + // Client is using a different version of WritableRpc + throw new IOException( + "WritableRpc version mismatch, client side version=" + + call.getRpcVersion() + ", server side version=" + + writableRpcVersion); + } + + //Verify protocol version. + //Bypass the version check for VersionedProtocol + if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { + long clientVersion = call.getProtocolVersion(); + ProtocolSignature serverInfo = ((VersionedProtocol) instance) + .getProtocolSignature(protocol.getCanonicalName(), call + .getProtocolVersion(), call.getClientMethodsHash()); + long serverVersion = serverInfo.getVersion(); + if (serverVersion != clientVersion) { + LOG.warn("Version mismatch: client version=" + clientVersion + + ", server version=" + serverVersion); + throw new RPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + } + long startTime = System.currentTimeMillis(); Object value = method.invoke(instance, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Tue May 3 22:16:14 2011 @@ -290,8 +290,7 @@ public class TestRPC extends TestCase { // Check rpcMetrics server.rpcMetrics.doUpdates(new NullContext()); - // Number 4 includes getProtocolVersion() - assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps()); + assertEquals(3, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps()); assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0); assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0); @@ -376,8 +375,9 @@ public class TestRPC extends TestCase { public void testStandaloneClient() throws IOException { try { - RPC.waitForProxy(TestProtocol.class, + TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L); + proxy.echo(""); fail("We should not have reached here"); } catch (ConnectException ioe) { //this is what we expected @@ -502,6 +502,7 @@ public class TestRPC extends TestCase { try { proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); + proxy.echo(""); } catch (RemoteException e) { LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); assertTrue(e.unwrapRemoteException() instanceof AccessControlException); @@ -527,6 +528,7 @@ public class TestRPC extends TestCase { try { proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, TestProtocol.versionID, mulitServerAddr, conf); + proxy.echo(""); } catch (RemoteException e) { LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); assertTrue(e.unwrapRemoteException() instanceof AccessControlException); Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Tue May 3 22:16:14 2011 @@ -18,19 +18,21 @@ package org.apache.hadoop.ipc; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import org.apache.commons.logging.*; +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; - import org.junit.After; - import org.junit.Test; /** Unit test for supporting method-name based compatible RPCs. */ @@ -247,4 +249,26 @@ public class TestRPCCompatibility { int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod}); assertEquals(hash1, hash2); } + + public interface TestProtocol4 extends TestProtocol2 { + public static final long versionID = 1L; + int echo(int value) throws IOException; + } + + @Test + public void testVersionMismatch() throws IOException { + server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2, + false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class, + TestProtocol4.versionID, addr, conf); + try { + proxy.echo(21); + fail("The call must throw VersionMismatch exception"); + } catch (IOException ex) { + Assert.assertTrue(ex.getMessage().contains("VersionMismatch")); + } + } } \ No newline at end of file Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1099284&r1=1099283&r2=1099284&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Tue May 3 22:16:14 2011 @@ -321,17 +321,20 @@ public class TestSaslRPC { try { proxy1 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class, TestSaslProtocol.versionID, addr, newConf); + proxy1.getAuthMethod(); Client client = WritableRpcEngine.getClient(conf); Set conns = client.getConnectionIds(); assertEquals("number of connections in cache is wrong", 1, conns.size()); // same conf, connection should be re-used proxy2 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class, TestSaslProtocol.versionID, addr, newConf); + proxy2.getAuthMethod(); assertEquals("number of connections in cache is wrong", 1, conns.size()); // different conf, new connection should be set up newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_2); proxy3 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class, TestSaslProtocol.versionID, addr, newConf); + proxy3.getAuthMethod(); ConnectionId[] connsArray = conns.toArray(new ConnectionId[0]); assertEquals("number of connections in cache is wrong", 2, connsArray.length);