hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1228081 - in /hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common: ./ src/main/docs/ src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Fri, 06 Jan 2012 10:59:57 GMT
Author: szetszwo
Date: Fri Jan  6 10:59:56 2012
New Revision: 1228081

URL: http://svn.apache.org/viewvc?rev=1228081&view=rev
Log:
svn merge -c 1164771 from trunk for HADOOP-7524.

Added:
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
      - copied unchanged from r1164771, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
      - copied unchanged from r1164771, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
Modified:
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
  (contents, props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/docs/
  (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
  (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan  6 10:59:56 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166009,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
Fri Jan  6 10:59:56 2012
@@ -1,5 +1,9 @@
 Hadoop Change Log
 
+Release 0.23-PB - Unreleased
+
+  HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same
protocol (sanjay Radia)
+
 Release 0.23.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan  6 10:59:56 2012
@@ -1,5 +1,5 @@
 /hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1166009,1166402,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan  6 10:59:56 2012
@@ -1,2 +1,2 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1183132,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1183132,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/src/docs:713112

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan  6 10:59:56 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1182641,1183132,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177035,1177487,1177531,1177859,1177864,1182641,1183132,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/core/src/java:713112
 /hadoop/core/trunk/src/core:776175-785643,785929-786278

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Fri Jan  6 10:59:56 2012
@@ -287,8 +287,8 @@ public class Client {
         authMethod = AuthMethod.KERBEROS;
       }
       
-      header = new ConnectionHeader(protocol == null ? null : protocol
-          .getName(), ticket, authMethod);
+      header = 
+        new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
       
       if (LOG.isDebugEnabled())
         LOG.debug("Use " + authMethod + " authentication for protocol "

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
Fri Jan  6 10:59:56 2012
@@ -62,6 +62,20 @@ import org.apache.hadoop.util.Reflection
  */
 public class RPC {
   static final Log LOG = LogFactory.getLog(RPC.class);
+  
+  
+  /**
+   * Get the protocol name.
+   *  If the protocol class has a ProtocolAnnotation, then get the protocol
+   *  name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public String getProtocolName(Class<?> protocol) {
+    if (protocol == null) {
+      return null;
+    }
+    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    return  (anno == null) ? protocol.getName() : anno.protocolName();
+  }
 
   private RPC() {}                                  // no public ctor
 
@@ -553,8 +567,10 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance. */
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress, int port,
+
+  public static <PROTO extends VersionedProtocol, IMPL extends PROTO> 
+        Server getServer(Class<PROTO> protocol,
+                                 IMPL instance, String bindAddress, int port,
                                  int numHandlers, int numReaders, int queueSizePerHandler,
                                  boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager)

@@ -576,6 +592,18 @@ public class RPC {
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
             conf, serverName, secretManager);
     }
+    
+    /**
+     * Add a protocol to the existing server.
+     * @param protocolClass - the protocol class
+     * @param protocolImpl - the impl of the protocol that will be called
+     * @return the server (for convenience)
+     */
+    public <PROTO extends VersionedProtocol, IMPL extends PROTO>
+      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
+    ) throws IOException {
+      throw new IOException("addProtocol Not Implemented");
+    }
   }
 
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Fri Jan  6 10:59:56 2012
@@ -893,7 +893,7 @@ public abstract class Server {
     private InetAddress addr;
     
     ConnectionHeader header = new ConnectionHeader();
-    Class<?> protocol;
+    String protocolName;
     boolean useSasl;
     SaslServer saslServer;
     private AuthMethod authMethod;
@@ -1284,15 +1284,8 @@ public abstract class Server {
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(buf));
       header.readFields(in);
-      try {
-        String protocolClassName = header.getProtocol();
-        if (protocolClassName != null) {
-          protocol = getProtocolClass(header.getProtocol(), conf);
-          rpcDetailedMetrics.init(protocol);
-        }
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown protocol: " + header.getProtocol());
-      }
+      protocolName = header.getProtocol();
+
       
       UserGroupInformation protocolUser = header.getUgi();
       if (!useSasl) {
@@ -1481,7 +1474,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocol, call.param, 
+              value = call(call.connection.protocolName, call.param, 
                            call.timestamp);
             } else {
               value = 
@@ -1490,7 +1483,7 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocol, 
+                       return call(call.connection.protocolName, 
                                    call.param, call.timestamp);
 
                      }
@@ -1758,7 +1751,7 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   * @deprecated Use {@link #call(String, Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
@@ -1766,7 +1759,7 @@ public abstract class Server {
   }
   
   /** Called for each call. */
-  public abstract Writable call(Class<?> protocol,
+  public abstract Writable call(String protocol,
                                Writable param, long receiveTime)
   throws IOException;
   

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
Fri Jan  6 10:59:56 2012
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
    * @return the version that the server will speak
    * @throws IOException if any IO error occurs
    */
-  @Deprecated
   public long getProtocolVersion(String protocol,
                                  long clientVersion) throws IOException;
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
Fri Jan  6 10:59:56 2012
@@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTarge
 
 import java.net.InetSocketAddress;
 import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -35,6 +38,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -47,10 +51,46 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
+ 
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
+
+  
   //writableRpcVersion should be updated if there is a change
   //in format of the rpc messages.
-  public static long writableRpcVersion = 1L;
+  
+  // 2L - added declared class to Invocation
+  public static final long writableRpcVersion = 2L; 
 
+  
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
     private String methodName;
@@ -59,11 +99,13 @@ public class WritableRpcEngine implement
     private Configuration conf;
     private long clientVersion;
     private int clientMethodsHash;
+    private String declaringClassProtocolName;
     
     //This could be different from static writableRpcVersion when received
     //at server, if client is using a different version.
     private long rpcVersion;
 
+    @SuppressWarnings("unused") // called when deserializing an invocation
     public Invocation() {}
 
     public Invocation(Method method, Object[] parameters) {
@@ -88,6 +130,8 @@ public class WritableRpcEngine implement
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
+      this.declaringClassProtocolName = 
+          RPC.getProtocolName(method.getDeclaringClass());
     }
 
     /** The name of the method invoked. */
@@ -103,6 +147,7 @@ public class WritableRpcEngine implement
       return clientVersion;
     }
 
+    @SuppressWarnings("unused")
     private int getClientMethodsHash() {
       return clientMethodsHash;
     }
@@ -115,8 +160,10 @@ public class WritableRpcEngine implement
       return rpcVersion;
     }
 
+    @SuppressWarnings("deprecation")
     public void readFields(DataInput in) throws IOException {
       rpcVersion = in.readLong();
+      declaringClassProtocolName = UTF8.readString(in);
       methodName = UTF8.readString(in);
       clientVersion = in.readLong();
       clientMethodsHash = in.readInt();
@@ -124,13 +171,16 @@ public class WritableRpcEngine implement
       parameterClasses = new Class[parameters.length];
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = 
+            ObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
+    @SuppressWarnings("deprecation")
     public void write(DataOutput out) throws IOException {
       out.writeLong(rpcVersion);
+      UTF8.writeString(out, declaringClassProtocolName);
       UTF8.writeString(out, methodName);
       out.writeLong(clientVersion);
       out.writeInt(clientMethodsHash);
@@ -273,30 +323,161 @@ public class WritableRpcEngine implement
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public Server getServer(Class<?> protocol,
-                          Object instance, String bindAddress, int port,
-                          int numHandlers, int numReaders, int queueSizePerHandler,
-                          boolean verbose, Configuration conf,
+  public RPC.Server getServer(Class<?> protocolClass,
+                      Object protocolImpl, String bindAddress, int port,
+                      int numHandlers, int numReaders, int queueSizePerHandler,
+                      boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, 
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
 
+
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private Object instance;
     private boolean verbose;
+    
+    /**
+     *  The key in Map
+     */
+    static class ProtoNameVer {
+      final String protocol;
+      final long   version;
+      ProtoNameVer(String protocol, long ver) {
+        this.protocol = protocol;
+        this.version = ver;
+      }
+      @Override
+      public boolean equals(Object o) {
+        if (o == null) 
+          return false;
+        if (this == o) 
+          return true;
+        if (! (o instanceof ProtoNameVer))
+          return false;
+        ProtoNameVer pv = (ProtoNameVer) o;
+        return ((pv.protocol.equals(this.protocol)) && 
+            (pv.version == this.version));     
+      }
+      @Override
+      public int hashCode() {
+        return protocol.hashCode() * 37 + (int) version;    
+      }
+    }
+    
+    /**
+     * The value in map
+     */
+    static class ProtoClassProtoImpl {
+      final Class<?> protocolClass;
+      final Object protocolImpl; 
+      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+        this.protocolClass = protocolClass;
+        this.protocolImpl = protocolImpl;
+      }
+    }
+    
+    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
+        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
+    
+    // Register  protocol and its impl for rpc calls
+    private void registerProtocolAndImpl(Class<?> protocolClass, 
+        Object protocolImpl) throws IOException {
+      String protocolName = RPC.getProtocolName(protocolClass);
+      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
+      long version;
+      try {
+        version = vp.getProtocolVersion(protocolName, 0);
+      } catch (Exception ex) {
+        LOG.warn("Protocol "  + protocolClass + 
+             " NOT registered as getProtocolVersion throws exception ");
+        return;
+      }
+      protocolImplMap.put(new ProtoNameVer(protocolName, version),
+          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+      LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() + 
+          " protocolClass=" + protocolClass.getName() + " version=" + version);
+    }
+    
+    private static class VerProtocolImpl {
+      final long version;
+      final ProtoClassProtoImpl protocolTarget;
+      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+        this.version = ver;
+        this.protocolTarget = protocolTarget;
+      }
+    }
+    
+    
+    @SuppressWarnings("unused") // will be useful later.
+    private VerProtocolImpl[] getSupportedProtocolVersions(
+        String protocolName) {
+      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
+      int i = 0;
+      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                        protocolImplMap.entrySet()) {
+        if (pv.getKey().protocol.equals(protocolName)) {
+          resultk[i++] = 
+              new VerProtocolImpl(pv.getKey().version, pv.getValue());
+        }
+      }
+      if (i == 0) {
+        return null;
+      }
+      VerProtocolImpl[] result = new VerProtocolImpl[i];
+      System.arraycopy(resultk, 0, result, 0, i);
+      return result;
+    }
+    
+    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
+      Long highestVersion = 0L;
+      ProtoClassProtoImpl highest = null;
+      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
+          .entrySet()) {
+        if (pv.getKey().protocol.equals(protocolName)) {
+          if ((highest == null) || (pv.getKey().version > highestVersion)) {
+            highest = pv.getValue();
+            highestVersion = pv.getKey().version;
+          } 
+        }
+      }
+      if (highest == null) {
+        return null;
+      }
+      return new VerProtocolImpl(highestVersion,  highest);   
+    }
+ 
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
+     * 
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)
+     *    
+     */
+    @Deprecated
+    public Server(Object instance, Configuration conf, String bindAddress,
+        int port) 
+      throws IOException {
+      this(null, instance, conf,  bindAddress, port);
+    }
+    
+    
+    /** Construct an RPC server.
+     * @param protocol class
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
      */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+    public Server(Class<?> protocolClass, Object protocolImpl, 
+        Configuration conf, String bindAddress, int port) 
       throws IOException {
-      this(instance, conf,  bindAddress, port, 1, -1, -1, false, null);
+      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
+          false, null);
     }
     
     private static String classNameBase(String className) {
@@ -307,35 +488,103 @@ public class WritableRpcEngine implement
       return names[names.length-1];
     }
     
+    
     /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * 
+     * @deprecated use Server#Server(Class, Object, 
+     *      Configuration, String, int, int, int, int, boolean, SecretManager)
+     */
+    @Deprecated
+    public Server(Object protocolImpl, Configuration conf, String bindAddress,
+        int port, int numHandlers, int numReaders, int queueSizePerHandler,
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
+            throws IOException {
+       this(null, protocolImpl,  conf,  bindAddress,   port,
+                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
+                   secretManager);
+   
+    }
+    
+    /** Construct an RPC server.
+     * @param protocolClass - the protocol being registered
+     *     can be null for compatibility with old usage (see below for details)
+     * @param protocolImpl the protocol impl that will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose,

-                  SecretManager<? extends TokenIdentifier> secretManager) 
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler, 
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, numReaders,
           queueSizePerHandler, conf,
-          classNameBase(instance.getClass().getName()), secretManager);
-      this.instance = instance;
+          classNameBase(protocolImpl.getClass().getName()), secretManager);
+
       this.verbose = verbose;
+      
+      
+      Class<?>[] protocols;
+      if (protocolClass == null) { // derive protocol from impl
+        /*
+         * In order to remain compatible with the old usage where a single
+         * target protocolImpl is suppled for all protocol interfaces, and
+         * the protocolImpl is derived from the protocolClass(es) 
+         * we register all interfaces extended by the protocolImpl
+         */
+        protocols = getProtocolInterfaces(protocolImpl.getClass());
+
+      } else {
+        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+          throw new IOException("protocolClass "+ protocolClass +
+              " is not implemented by protocolImpl which is of class " +
+              protocolImpl.getClass());
+        }
+        // register protocol class and its super interfaces
+        registerProtocolAndImpl(protocolClass, protocolImpl);
+        protocols = getProtocolInterfaces(protocolClass);
+      }
+      for (Class<?> p : protocols) {
+        if (!p.equals(VersionedProtocol.class)) {
+          registerProtocolAndImpl(p, protocolImpl);
+        }
+      }
+
     }
 
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+ 
+    @Override
+    public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
+      addProtocol(
+        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
+      registerProtocolAndImpl(protocolClass, protocolImpl);
+      return this;
+    }
+    
+    /**
+     * Process a client call
+     * @param protocolName - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param param  parameters
+     * @param receivedTime time at which the call receoved (for metrics)
+     * @return the call's return
+     * @throws IOException
+     */
+    public Writable call(String protocolName, Writable param, long receivedTime) 
     throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
 
-        Method method = protocol.getMethod(call.getMethodName(),
-                                           call.getParameterClasses());
-        method.setAccessible(true);
-
         // Verify rpc version
         if (call.getRpcVersion() != writableRpcVersion) {
           // Client is using a different version of WritableRpc
@@ -344,25 +593,51 @@ public class WritableRpcEngine implement
                   + call.getRpcVersion() + ", server side version="
                   + writableRpcVersion);
         }
-        
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
-          long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
+
+        long clientVersion = call.getProtocolVersion();
+        final String protoName;
+        ProtoClassProtoImpl protocolImpl;
+        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+          // VersionProtocol methods are often used by client to figure out
+          // which version of protocol to use.
+          //
+          // Versioned protocol methods should go the protocolName protocol
+          // rather than the declaring class of the method since the
+          // the declaring class is VersionedProtocol which is not 
+          // registered directly.
+          // Send the call to the highest  protocol version
+          protocolImpl = 
+              getHighestSupportedProtocol(protocolName).protocolTarget;
+        } else {
+          protoName = call.declaringClassProtocolName;
+
+          // Find the right impl for the protocol based on client version.
+          ProtoNameVer pv = 
+              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+          protocolImpl = protocolImplMap.get(pv);
+          if (protocolImpl == null) { // no match for Protocol AND Version
+             VerProtocolImpl highest = 
+                 getHighestSupportedProtocol(protoName);
+            if (highest == null) {
+              throw new IOException("Unknown protocol: " + protoName);
+            } else { // protocol supported but not the version that client wants
+              throw new RPC.VersionMismatch(protoName, clientVersion,
+                highest.version);
+            }
           }
         }
+        
+
+        // Invoke the protocol method
 
         long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
+        Method method = 
+            protocolImpl.protocolClass.getMethod(call.getMethodName(),
+            call.getParameterClasses());
+        method.setAccessible(true);
+        rpcDetailedMetrics.init(protocolImpl.protocolClass);
+        Object value = 
+            method.invoke(protocolImpl.protocolImpl, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receivedTime);
         if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Fri Jan  6 10:59:56 2012
@@ -96,7 +96,7 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+    public Writable call(String protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
         // sleep a bit

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
Fri Jan  6 10:59:56 2012
@@ -72,7 +72,7 @@ public class TestIPCServerResponder exte
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+    public Writable call(String protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
         try {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1228081&r1=1228080&r2=1228081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
(original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
Fri Jan  6 10:59:56 2012
@@ -39,7 +39,7 @@ import org.junit.Test;
 public class TestRPCCompatibility {
   private static final String ADDRESS = "0.0.0.0";
   private static InetSocketAddress addr;
-  private static Server server;
+  private static RPC.Server server;
   private ProtocolProxy<?> proxy;
 
   public static final Log LOG =
@@ -52,10 +52,12 @@ public class TestRPCCompatibility {
     void ping() throws IOException;    
   }
   
-  public interface TestProtocol1 extends TestProtocol0 {
+  public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
     String echo(String value) throws IOException;
   }
 
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol2 extends TestProtocol1 {
     int echo(int value)  throws IOException;
   }
@@ -89,11 +91,23 @@ public class TestRPCCompatibility {
   public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
     @Override
     public String echo(String value) { return value; }
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+        return TestProtocol1.versionID;
+    }
   }
 
   public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
     @Override
     public int echo(int value) { return value; }
+
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+      return TestProtocol2.versionID;
+    }
+
   }
   
   @After
@@ -109,8 +123,10 @@ public class TestRPCCompatibility {
   @Test  // old client vs new server
   public void testVersion0ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                            impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -172,8 +188,10 @@ public class TestRPCCompatibility {
   @Test // Compatible new client & old server
   public void testVersion2ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                              impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -190,8 +208,10 @@ public class TestRPCCompatibility {
   @Test // equal version client and server
   public void testVersion2ClientVersion2Server() throws Exception {
     // create a server with two handlers
+    TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
-                              new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+                             impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -250,14 +270,16 @@ public class TestRPCCompatibility {
     assertEquals(hash1, hash2);
   }
   
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol4 extends TestProtocol2 {
-    public static final long versionID = 1L;
+    public static final long versionID = 4L;
     int echo(int value)  throws IOException;
   }
   
   @Test
   public void testVersionMismatch() throws IOException {
-    server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
         false, conf, null);
     server.start();
     addr = NetUtils.getConnectAddress(server);
@@ -268,7 +290,8 @@ public class TestRPCCompatibility {
       proxy.echo(21);
       fail("The call must throw VersionMismatch exception");
     } catch (IOException ex) {
-      Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+      Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), 
+          ex.getMessage().contains("VersionMismatch"));
     }
   }
 }
\ No newline at end of file



Mime
View raw message