hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1210547 [1/2] - in /hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common: ./ src/main/bin/ src/main/docs/ src/main/docs/src/documentation/content/xdocs/ src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/...
Date Mon, 05 Dec 2011 18:00:49 GMT
Author: atm
Date: Mon Dec  5 18:00:44 2011
New Revision: 1210547

URL: http://svn.apache.org/viewvc?rev=1210547&view=rev
Log:
Merge trunk into HA branch.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java
      - copied unchanged from r1210544, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/commands_manual.xml
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/core/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/TestRpcServiceProtos.java
    hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec  5 18:00:44 2011
@@ -5,6 +5,11 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    HADOOP-7773. Add support for protocol buffer based RPC engine.
+    (suresh)
+
+    HADOOP-7875. Add helper class to unwrap protobuf ServiceException.
+    (suresh)
 
   IMPROVEMENTS
 
@@ -61,6 +66,12 @@ Trunk (unreleased changes)
 
     HADOOP-7590. Mavenize streaming and MR examples. (tucu)
 
+		HADOOP-7862. Move the support for multiple protocols to lower layer so 
+    that Writable, PB and Avro can all use it (Sanjay)
+
+    HADOOP-7876. Provided access to encoded key in DelegationKey for
+    use in protobuf based RPCs. (suresh)
+
   BUGS
 
     HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required
@@ -102,13 +113,12 @@ Trunk (unreleased changes)
 
     HDFS-2614. hadoop dist tarball is missing hdfs headers. (tucu)
 
+    HADOOP-7874. native libs should be under lib/native/ dir. (tucu)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)
 
-    HADOOP-7773. Add support for protocol buffer based RPC engine.
-    (suresh)
-
 Release 0.23.1 - Unreleased
 
   INCOMPATIBLE CHANGES
@@ -134,6 +144,9 @@ Release 0.23.1 - Unreleased
     HADOOP-7804. Enable hadoop config generator to set configurations to enable
     short circuit read. (Arpit Gupta via jitendra)
 
+    HADOOP-7877. Update balancer CLI usage documentation to include the new
+    -policy option.  (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec  5 18:00:44 2011
@@ -1,5 +1,5 @@
 /hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161333-1209165
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161333-1210544
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh Mon Dec  5 18:00:44 2011
@@ -186,29 +186,16 @@ fi
 # setup 'java.library.path' for native-hadoop code if necessary
 
 if [ -d "${HADOOP_PREFIX}/build/native" -o -d "${HADOOP_PREFIX}/lib/native" ]; then
-  JAVA_PLATFORM=`CLASSPATH=${CLASSPATH} ${JAVA} -Xmx32m ${HADOOP_JAVA_PLATFORM_OPTS} org.apache.hadoop.util.PlatformName | sed -e "s/ /_/g"`
-  
-  if [ -d "$HADOOP_PREFIX/build/native" ]; then
-    if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
-        JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/build/native/${JAVA_PLATFORM}/lib
-    else
-        JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/build/native/${JAVA_PLATFORM}/lib
-    fi
-  fi
-  
+    
   if [ -d "${HADOOP_PREFIX}/lib/native" ]; then
     if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
-      JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/lib/native/${JAVA_PLATFORM}
+      JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_PREFIX}/lib/native
     else
-      JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib/native/${JAVA_PLATFORM}
+      JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib/native
     fi
   fi
 fi
 
-if [ -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then
-  JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib
-fi
-
 # cygwin path translation
 if $cygwin; then
   JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"`

Propchange: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec  5 18:00:44 2011
@@ -1,2 +1,2 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:1152502-1209165
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:1152502-1210544
 /hadoop/core/branches/branch-0.19/src/docs:713112

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/commands_manual.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/commands_manual.xml?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/commands_manual.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/commands_manual.xml Mon Dec  5 18:00:44 2011
@@ -445,14 +445,22 @@
 					<a href="http://hadoop.apache.org/hdfs/docs/current/hdfs_user_guide.html#Rebalancer">Rebalancer</a>.
 				</p>
 				<p>
-					<code>Usage: hadoop balancer [-threshold &lt;threshold&gt;]</code>
+					<code>Usage: hadoop balancer [-policy &lt;blockpool|datanode&gt;] [-threshold &lt;threshold&gt;]</code>
 				</p>
 				<table>
 			          <tr><th> COMMAND_OPTION </th><th> Description </th></tr>
-			
+			           <tr>
+					<td><code>-policy &lt;blockpool|datanode&gt;</code></td>
+					<td>The balancing policy.
+					    <br /><code>datanode</code>: Cluster is balance if the disk usage of each datanode is balance.
+					    <br /><code>blockpool</code>: Cluster is balance if the disk usage of each block pool in each datanode is balance.
+					    <br />Note that <code>blockpool</code> is a condition stronger than <code>datanode</code>.
+					    The default policy is <code>datanode</code>.
+					</td>
+			           </tr>
 			           <tr>
 			          	<td><code>-threshold &lt;threshold&gt;</code></td>
-			            <td>Percentage of disk capacity. This overwrites the default threshold.</td>
+			            <td>Percentage of disk capacity. This default threshold is 10%.</td>
 			           </tr>
 			     </table>
 			</section>

Propchange: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec  5 18:00:44 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1152502-1209165
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1152502-1210544
 /hadoop/core/branches/branch-0.19/core/src/java:713112
 /hadoop/core/trunk/src/core:776175-785643,785929-786278

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Mon Dec  5 18:00:44 2011
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements Rp
       super((Class)null, new Object(), conf,
             bindAddress, port, numHandlers, numReaders,
             queueSizePerHandler, verbose, secretManager);
-      super.addProtocol(TunnelProtocol.class, responder);
+      // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+      super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
       responder.addProtocol(iface, impl);
     }
 
 
     @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+    public Server
+      addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
         throws IOException {
       responder.addProtocol(protocolClass, protocolImpl);
       return this;

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Dec  5 18:00:44 2011
@@ -1002,17 +1002,19 @@ public class Client {
   }
 
   /**
-   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   *  for RPC_BUILTIN
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-    return call(RpcKind.RPC_WRITABLE, param, address);
+    return call(RpcKind.RPC_BUILTIN, param, address);
     
   }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1025,7 +1027,8 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, 
+   * ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1042,7 +1045,8 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1056,7 +1060,7 @@ public class Client {
 
   
   /**
-   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, 
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    */
@@ -1066,7 +1070,7 @@ public class Client {
       throws InterruptedException, IOException {
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+    return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /**
@@ -1087,21 +1091,28 @@ public class Client {
   }
   
   /**
-   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
-   * except the rpcKind is RPC_WRITABLE
+   * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_BUILTIN
    */
   public Writable call(Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-     return call(RpcKind.RPC_WRITABLE, param, remoteId);
+     return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
-  /** Make a call, passing <code>param</code>, to the IPC server defined by
-   * <code>remoteId</code>, returning the value.  
+  /** 
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc respond.
+   * 
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @returns the rpc response
    * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception. */
-  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
-      throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, param);
+   * threw an exception.
+   */
+  public Writable call(RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId) throws InterruptedException, IOException {
+    Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Dec  5 18:00:44 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceExcept
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+  
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(
+        RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+        new Server.ProtoBufRpcInvoker());
+  }
 
   private static final ClientCache CLIENTS = new ClientCache();
 
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implement
   }
 
   private static class Invoker implements InvocationHandler, Closeable {
-    private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+    private final Map<String, Message> returnTypes = 
+        new ConcurrentHashMap<String, Message>();
     private boolean isClosed = false;
-    private Client.ConnectionId remoteId;
-    private Client client;
+    private final Client.ConnectionId remoteId;
+    private final Client client;
+    private final long clientProtocolVersion;
+    private final String protocolName;
 
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implement
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory,
           RpcResponseWritable.class);
+      this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+      this.protocolName = RPC.getProtocolName(protocol);
     }
 
     private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implement
 
       Message param = (Message) params[1];
       builder.setRequest(param.toByteString());
+      // For protobuf, {@code protocol} used when creating client side proxy is
+      // the interface extending BlockingInterface, which has the annotations 
+      // such as ProtocolName etc.
+      //
+      // Using Method.getDeclaringClass(), as in WritableEngine to get at
+      // the protocol interface will return BlockingInterface, from where 
+      // the annotation ProtocolName and Version cannot be
+      // obtained.
+      //
+      // Hence we simply use the protocol class used to create the proxy.
+      // For PB this may limit the use of mixins on client side.
+      builder.setDeclaringClassProtocolName(protocolName);
+      builder.setClientProtocolVersion(clientProtocolVersion);
       rpcRequest = builder.build();
       return rpcRequest;
     }
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implement
         RpcResponseWritable.class);
   }
   
+ 
 
   @Override
-  public RPC.Server getServer(Class<?> protocol, Object instance,
+  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager)
       throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers,
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocol, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
   
   private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implement
   }
 
   public static class Server extends RPC.Server {
-    private BlockingService service;
-    private boolean verbose;
-
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length - 1];
-    }
-
     /**
      * Construct an RPC server.
      * 
-     * @param instance the instance whose methods will be called
+     * @param protocolClass the class of protocol
+     * @param protocolImpl the protocolImpl whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,
-        int port, int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
         throws IOException {
       super(bindAddress, port, RpcRequestWritable.class, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(instance
+          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
               .getClass().getName()), secretManager);
-      this.service = (BlockingService) instance;
-      this.verbose = verbose;
+      this.verbose = verbose;  
+      registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, 
+          protocolClass, protocolImpl);
     }
 
-    /**
-     * This is a server side method, which is invoked over RPC. On success
-     * the return response has protobuf response payload. On failure, the
-     * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
-     * 
-     * In this method there three types of exceptions possible and they are
-     * returned in response as follows.
-     * <ol>
-     * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
-     * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
-     * this method returns in response the exception thrown by the service.</li>
-     * <li> Other exceptions thrown by the service. They are returned as
-     * it is.</li>
-     * </ol>
-     */
-    @Override
-    public Writable call(String protocol, Writable writableRequest,
-        long receiveTime) throws IOException {
-      RpcRequestWritable request = (RpcRequestWritable) writableRequest;
-      HadoopRpcRequestProto rpcRequest = request.message;
-      String methodName = rpcRequest.getMethodName();
-      if (verbose)
-        LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
-      MethodDescriptor methodDescriptor = service.getDescriptorForType()
-          .findMethodByName(methodName);
-      if (methodDescriptor == null) {
-        String msg = "Unknown method " + methodName + " called on " + protocol
-            + " protocol.";
-        LOG.warn(msg);
-        return handleException(new RpcServerException(msg));
-      }
-      Message prototype = service.getRequestPrototype(methodDescriptor);
-      Message param = prototype.newBuilderForType()
-          .mergeFrom(rpcRequest.getRequest()).build();
-      Message result;
-      try {
-        result = service.callBlockingMethod(methodDescriptor, null, param);
-      } catch (ServiceException e) {
-        Throwable cause = e.getCause();
-        return handleException(cause != null ? cause : e);
-      } catch (Exception e) {
-        return handleException(e);
-      }
-
-      HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
-      return new RpcResponseWritable(response);
-    }
-
-    private RpcResponseWritable handleException(Throwable e) {
+    private static RpcResponseWritable handleException(Throwable e) {
       HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
           .setExceptionName(e.getClass().getName())
           .setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implement
       return new RpcResponseWritable(response);
     }
 
-    private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+    private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
         Message message) {
       HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
           .setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implement
           .build();
       return res;
     }
+    
+    /**
+     * Protobuf invoker for {@link RpcInvoker}
+     */
+    static class ProtoBufRpcInvoker implements RpcInvoker {
+
+      @Override 
+      /**
+       * This is a server side method, which is invoked over RPC. On success
+       * the return response has protobuf response payload. On failure, the
+       * exception name and the stack trace are return in the resposne.
+       * See {@link HadoopRpcResponseProto}
+       * 
+       * In this method there three types of exceptions possible and they are
+       * returned in response as follows.
+       * <ol>
+       * <li> Exceptions encountered in this method that are returned 
+       * as {@link RpcServerException} </li>
+       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
+       * In that this method returns in response the exception thrown by the 
+       * service.</li>
+       * <li> Other exceptions thrown by the service. They are returned as
+       * it is.</li>
+       * </ol>
+       */
+      public Writable call(RPC.Server server, String protocol,
+          Writable writableRequest, long receiveTime) throws IOException {
+        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+        HadoopRpcRequestProto rpcRequest = request.message;
+        String methodName = rpcRequest.getMethodName();
+        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+        if (server.verbose)
+          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+        
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+        ProtoClassProtoImpl protocolImpl = 
+            server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+        if (protocolImpl == null) { // no match for Protocol AND Version
+          VerProtocolImpl highest = 
+              server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, 
+                  protoName);
+          if (highest == null) {
+            throw new IOException("Unknown protocol: " + protoName);
+          }
+          // protocol supported but not the version that client wants
+          throw new RPC.VersionMismatch(protoName, clientVersion,
+              highest.version);
+        }
+        
+        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+        MethodDescriptor methodDescriptor = service.getDescriptorForType()
+            .findMethodByName(methodName);
+        if (methodDescriptor == null) {
+          String msg = "Unknown method " + methodName + " called on " + protocol
+              + " protocol.";
+          LOG.warn(msg);
+          return handleException(new RpcServerException(msg));
+        }
+        Message prototype = service.getRequestPrototype(methodDescriptor);
+        Message param = prototype.newBuilderForType()
+            .mergeFrom(rpcRequest.getRequest()).build();
+        Message result;
+        try {
+          result = service.callBlockingMethod(methodDescriptor, null, param);
+        } catch (ServiceException e) {
+          Throwable cause = e.getCause();
+          return handleException(cause != null ? cause : e);
+        } catch (Exception e) {
+          return handleException(e);
+        }
+  
+        HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+        return new RpcResponseWritable(response);
+      }
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Mon Dec  5 18:00:44 2011
@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPol
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ProtocolInfo {
   String protocolName();  // the name of the protocol (i.e. rpc service)
+  long protocolVersion() default -1; // default means not defined use old way
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Mon Dec  5 18:00:44 2011
@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
   
   private void fetchServerMethods(Method method) throws IOException {
     long clientVersion;
-    try {
-      Field versionField = method.getDeclaringClass().getField("versionID");
-      versionField.setAccessible(true);
-      clientVersion = versionField.getLong(method.getDeclaringClass());
-    } catch (NoSuchFieldException ex) {
-      throw new RuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    }
+    clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
     int clientMethodsHash = ProtocolSignature.getFingerprint(method
         .getDeclaringClass().getMethods());
     ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
-        .getProtocolSignature(protocol.getName(), clientVersion,
+        .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
             clientMethodsHash);
     long serverVersion = serverInfo.getVersion();
     if (serverVersion != clientVersion) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Mon Dec  5 18:00:44 2011
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ProtocolSignature implements Writable {
   static {               // register a ctor
     WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implement
   /**
    * A cache that maps a protocol's name to its signature & finger print
    */
-  final private static HashMap<String, ProtocolSigFingerprint> 
+  private final static HashMap<String, ProtocolSigFingerprint> 
      PROTOCOL_FINGERPRINT_CACHE = 
        new HashMap<String, ProtocolSigFingerprint>();
   
+  @VisibleForTesting
+  public static void resetCache() {
+    PROTOCOL_FINGERPRINT_CACHE.clear();
+  }
+  
   /**
    * Return a protocol's signature and finger print from cache
    * 
@@ -177,7 +184,7 @@ public class ProtocolSignature implement
    */
   private static ProtocolSigFingerprint getSigFingerprint(
       Class <? extends VersionedProtocol> protocol, long serverVersion) {
-    String protocolName = protocol.getName();
+    String protocolName = RPC.getProtocolName(protocol);
     synchronized (PROTOCOL_FINGERPRINT_CACHE) {
       ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
       if (sig == null) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Dec  5 18:00:44 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.io.*;
 import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.Reflection
  * the protocol instance is transmitted.
  */
 public class RPC {
+  
+  interface RpcInvoker {   
+    /**
+     * Process a client call on the server side
+     * @param server the server within whose context this rpc call is made
+     * @param protocol - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param rpcRequest  - deserialized
+     * @param receiveTime time at which the call received (for metrics)
+     * @return the call's return
+     * @throws IOException
+     **/
+    public Writable call(Server server, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException ;
+  }
+  
   static final Log LOG = LogFactory.getLog(RPC.class);
   
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
   
   /**
    * Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
     if (protocol == null) {
       return null;
     }
-    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
     return  (anno == null) ? protocol.getName() : anno.protocolName();
   }
+  
+  /**
+   * Get the protocol version from protocol class.
+   * If the protocol class has a ProtocolAnnotation, then get the protocol
+   * name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public long getProtocolVersion(Class<?> protocol) {
+    if (protocol == null) {
+      throw new IllegalArgumentException("Null protocol");
+    }
+    long version;
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    if (anno != null) {
+      version = anno.protocolVersion();
+      if (version != -1)
+        return version;
+    }
+    try {
+      Field versionField = protocol.getField("versionID");
+      versionField.setAccessible(true);
+      return versionField.getLong(protocol);
+    } catch (NoSuchFieldException ex) {
+      throw new RuntimeException(ex);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
   private RPC() {}                                  // no public ctor
 
@@ -590,6 +668,144 @@ public class RPC {
 
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
+   boolean verbose;
+   static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+   
+   /**
+    * Store a map of protocol and version to its implementation
+    */
+   /**
+    *  The key in Map
+    */
+   static class ProtoNameVer {
+     final String protocol;
+     final long   version;
+     ProtoNameVer(String protocol, long ver) {
+       this.protocol = protocol;
+       this.version = ver;
+     }
+     @Override
+     public boolean equals(Object o) {
+       if (o == null) 
+         return false;
+       if (this == o) 
+         return true;
+       if (! (o instanceof ProtoNameVer))
+         return false;
+       ProtoNameVer pv = (ProtoNameVer) o;
+       return ((pv.protocol.equals(this.protocol)) && 
+           (pv.version == this.version));     
+     }
+     @Override
+     public int hashCode() {
+       return protocol.hashCode() * 37 + (int) version;    
+     }
+   }
+   
+   /**
+    * The value in map
+    */
+   static class ProtoClassProtoImpl {
+     final Class<?> protocolClass;
+     final Object protocolImpl; 
+     ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+       this.protocolClass = protocolClass;
+       this.protocolImpl = protocolImpl;
+     }
+   }
+
+   ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
+       new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+   
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+     if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+       for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+         protocolImplMapArray.add(
+             new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+       }
+     }
+     return protocolImplMapArray.get(rpcKind.ordinal());   
+   }
+   
+   // Register  protocol and its impl for rpc calls
+   void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
+       Object protocolImpl) throws IOException {
+     String protocolName = RPC.getProtocolName(protocolClass);
+     long version;
+     
+
+     try {
+       version = RPC.getProtocolVersion(protocolClass);
+     } catch (Exception ex) {
+       LOG.warn("Protocol "  + protocolClass + 
+            " NOT registered as cannot get protocol version ");
+       return;
+     }
+
+
+     getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+         new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+     LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
+         " ProtocolImpl=" + protocolImpl.getClass().getName() + 
+         " protocolClass=" + protocolClass.getName());
+   }
+   
+   static class VerProtocolImpl {
+     final long version;
+     final ProtoClassProtoImpl protocolTarget;
+     VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+       this.version = ver;
+       this.protocolTarget = protocolTarget;
+     }
+   }
+   
+   
+   @SuppressWarnings("unused") // will be useful later.
+   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+       String protocolName) {
+     VerProtocolImpl[] resultk = 
+         new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+     int i = 0;
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                       getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         resultk[i++] = 
+             new VerProtocolImpl(pv.getKey().version, pv.getValue());
+       }
+     }
+     if (i == 0) {
+       return null;
+     }
+     VerProtocolImpl[] result = new VerProtocolImpl[i];
+     System.arraycopy(resultk, 0, result, 0, i);
+     return result;
+   }
+   
+   VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, 
+       String protocolName) {    
+     Long highestVersion = 0L;
+     ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
+           getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         if ((highest == null) || (pv.getKey().version > highestVersion)) {
+           highest = pv.getValue();
+           highestVersion = pv.getKey().version;
+         } 
+       }
+     }
+     if (highest == null) {
+       return null;
+     }
+     return new VerProtocolImpl(highestVersion,  highest);   
+   }
   
     protected Server(String bindAddress, int port, 
                      Class<? extends Writable> paramClass, int handlerCount,
@@ -606,11 +822,17 @@ public class RPC {
      * @param protocolImpl - the impl of the protocol that will be called
      * @return the server (for convenience)
      */
-    public <PROTO, IMPL extends PROTO>
-      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
-    ) throws IOException {
-      throw new IOException("addProtocol Not Implemented");
+    public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+        Object protocolImpl) throws IOException {
+      registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+      return this;
+    }
+    
+    @Override
+    public Writable call(RpcKind rpcKind, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException {
+      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+          receiveTime);
     }
   }
-
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Mon Dec  5 18:00:44 2011
@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements
   }
   
   public enum RpcKind {
-    RPC_BUILTIN ((short ) 1),  // Used for built in calls
-    RPC_WRITABLE ((short ) 2),
-    RPC_PROTOCOL_BUFFER ((short)3), 
-    RPC_AVRO ((short)4);
-    
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+    RPC_AVRO ((short) 4);            // Use AvroRpcEngine 
+    static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
     private final short value;
-    private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
     RpcKind(short val) {
       this.value = val;
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Dec  5 18:00:44 2011
@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteCha
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -133,6 +135,59 @@ public abstract class Server {
    * Initial and max size of response buffer
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.info("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
@@ -197,7 +252,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -1425,9 +1480,27 @@ public abstract class Server {
         throw new IOException("IPC Server does not implement operation" + 
               header.getOperation());
       }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
       Writable rpcRequest;
       try { //Read the rpc request
-        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
         rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
@@ -1519,7 +1592,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.rpcRequest, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1528,7 +1601,7 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocolName, 
+                       return call(call.rpcKind, call.connection.protocolName, 
                                    call.rpcRequest, call.timestamp);
 
                      }
@@ -1590,24 +1663,33 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
@@ -1797,17 +1879,17 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(String, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(String protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   /**
    * Authorize the incoming client connection.
@@ -1957,5 +2039,5 @@ public abstract class Server {
 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1210547&r1=1210546&r2=1210547&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Dec  5 18:00:44 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTarge
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
- 
-  /**
-   * Get all superInterfaces that extend VersionedProtocol
-   * @param childInterfaces
-   * @return the super interfaces that extend VersionedProtocol
-   */
-  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
-    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
-    for (Class<?> childInterface : childInterfaces) {
-      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
-          allInterfaces.add(childInterface);
-          allInterfaces.addAll(
-              Arrays.asList(
-                  getSuperInterfaces(childInterface.getInterfaces())));
-      } else {
-        LOG.warn("Interface " + childInterface +
-              " ignored because it does not extend VersionedProtocol");
-      }
-    }
-    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
-  }
-  
-  /**
-   * Get all interfaces that the given protocol implements or extends
-   * which are assignable from VersionedProtocol.
-   */
-  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
-    Class<?>[] interfaces  = protocol.getInterfaces();
-    return getSuperInterfaces(interfaces);
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
   }
 
   
@@ -120,15 +88,7 @@ public class WritableRpcEngine implement
         clientVersion = 0;
         clientMethodsHash = 0;
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
@@ -329,140 +289,25 @@ public class WritableRpcEngine implement
 
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private boolean verbose;
-    
-    /**
-     *  The key in Map
-     */
-    static class ProtoNameVer {
-      final String protocol;
-      final long   version;
-      ProtoNameVer(String protocol, long ver) {
-        this.protocol = protocol;
-        this.version = ver;
-      }
-      @Override
-      public boolean equals(Object o) {
-        if (o == null) 
-          return false;
-        if (this == o) 
-          return true;
-        if (! (o instanceof ProtoNameVer))
-          return false;
-        ProtoNameVer pv = (ProtoNameVer) o;
-        return ((pv.protocol.equals(this.protocol)) && 
-            (pv.version == this.version));     
-      }
-      @Override
-      public int hashCode() {
-        return protocol.hashCode() * 37 + (int) version;    
-      }
-    }
-    
-    /**
-     * The value in map
-     */
-    static class ProtoClassProtoImpl {
-      final Class<?> protocolClass;
-      final Object protocolImpl; 
-      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
-        this.protocolClass = protocolClass;
-        this.protocolImpl = protocolImpl;
-      }
-    }
-    
-    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
-        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-    
-    // Register  protocol and its impl for rpc calls
-    private void registerProtocolAndImpl(Class<?> protocolClass, 
-        Object protocolImpl) throws IOException {
-      String protocolName = RPC.getProtocolName(protocolClass);
-      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
-      long version;
-      try {
-        version = vp.getProtocolVersion(protocolName, 0);
-      } catch (Exception ex) {
-        LOG.warn("Protocol "  + protocolClass + 
-             " NOT registered as getProtocolVersion throws exception ");
-        return;
-      }
-      protocolImplMap.put(new ProtoNameVer(protocolName, version),
-          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
-      LOG.debug("Protocol Name = " + protocolName +  " version=" + version +
-          " ProtocolImpl=" + protocolImpl.getClass().getName() + 
-          " protocolClass=" + protocolClass.getName());
-    }
-    
-    private static class VerProtocolImpl {
-      final long version;
-      final ProtoClassProtoImpl protocolTarget;
-      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
-        this.version = ver;
-        this.protocolTarget = protocolTarget;
-      }
-    }
-    
-    
-    @SuppressWarnings("unused") // will be useful later.
-    private VerProtocolImpl[] getSupportedProtocolVersions(
-        String protocolName) {
-      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
-      int i = 0;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
-                                        protocolImplMap.entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          resultk[i++] = 
-              new VerProtocolImpl(pv.getKey().version, pv.getValue());
-        }
-      }
-      if (i == 0) {
-        return null;
-      }
-      VerProtocolImpl[] result = new VerProtocolImpl[i];
-      System.arraycopy(resultk, 0, result, 0, i);
-      return result;
-    }
-    
-    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
-      Long highestVersion = 0L;
-      ProtoClassProtoImpl highest = null;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
-          .entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          if ((highest == null) || (pv.getKey().version > highestVersion)) {
-            highest = pv.getValue();
-            highestVersion = pv.getKey().version;
-          } 
-        }
-      }
-      if (highest == null) {
-        return null;
-      }
-      return new VerProtocolImpl(highestVersion,  highest);   
-    }
- 
-
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * 
-     * @deprecated Use #Server(Class, Object, Configuration, String, int)
-     *    
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
      */
     @Deprecated
     public Server(Object instance, Configuration conf, String bindAddress,
-        int port) 
-      throws IOException {
+        int port) throws IOException {
       this(null, instance, conf,  bindAddress, port);
     }
     
     
     /** Construct an RPC server.
-     * @param protocol class
-     * @param instance the instance whose methods will be called
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implement
           false, null);
     }
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implement
    
     }
     
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolClass - the protocol being registered
      *     can be null for compatibility with old usage (see below for details)
      * @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implement
         int numHandlers, int numReaders, int queueSizePerHandler, 
         boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
           classNameBase(protocolImpl.getClass().getName()), secretManager);
 
@@ -535,7 +373,7 @@ public class WritableRpcEngine implement
          * the protocolImpl is derived from the protocolClass(es) 
          * we register all interfaces extended by the protocolImpl
          */
-        protocols = getProtocolInterfaces(protocolImpl.getClass());
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
       } else {
         if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implement
               protocolImpl.getClass());
         }
         // register protocol class and its super interfaces
-        registerProtocolAndImpl(protocolClass, protocolImpl);
-        protocols = getProtocolInterfaces(protocolClass);
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
       }
       for (Class<?> p : protocols) {
         if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(p, protocolImpl);
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
         }
       }
 
     }
 
- 
-    @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(
-        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
-      registerProtocolAndImpl(protocolClass, protocolImpl);
-      return this;
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
     }
     
-    /**
-     * Process a client call
-     * @param protocolName - the protocol name (the class of the client proxy
-     *      used to make calls to the rpc server.
-     * @param param  parameters
-     * @param receivedTime time at which the call receoved (for metrics)
-     * @return the call's return
-     * @throws IOException
-     */
-    public Writable call(String protocolName, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
+
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
 
-        long clientVersion = call.getProtocolVersion();
-        final String protoName;
-        ProtoClassProtoImpl protocolImpl;
-        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-          // VersionProtocol methods are often used by client to figure out
-          // which version of protocol to use.
-          //
-          // Versioned protocol methods should go the protocolName protocol
-          // rather than the declaring class of the method since the
-          // the declaring class is VersionedProtocol which is not 
-          // registered directly.
-          // Send the call to the highest  protocol version
-          protocolImpl = 
-              getHighestSupportedProtocol(protocolName).protocolTarget;
-        } else {
-          protoName = call.declaringClassProtocolName;
-
-          // Find the right impl for the protocol based on client version.
-          ProtoNameVer pv = 
-              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-          protocolImpl = protocolImplMap.get(pv);
-          if (protocolImpl == null) { // no match for Protocol AND Version
-             VerProtocolImpl highest = 
-                 getHighestSupportedProtocol(protoName);
+          long clientVersion = call.getProtocolVersion();
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
             if (highest == null) {
-              throw new IOException("Unknown protocol: " + protoName);
-            } else { // protocol supported but not the version that client wants
-              throw new RPC.VersionMismatch(protoName, clientVersion,
-                highest.version);
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
             }
           }
-        }
-        
+          
 
-        // Invoke the protocol method
+          // Invoke the protocol method
 
-        long startTime = System.currentTimeMillis();
-        Method method = 
-            protocolImpl.protocolClass.getMethod(call.getMethodName(),
-            call.getParameterClasses());
-        method.setAccessible(true);
-        rpcDetailedMetrics.init(protocolImpl.protocolClass);
-        Object value = 
-            method.invoke(protocolImpl.protocolImpl, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
     }
   }
-
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }



Mime
View raw message