hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1293964 [2/3] - in /hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/security/to...
Date Sun, 26 Feb 2012 23:32:12 GMT
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sun Feb 26 23:32:06 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 
@@ -26,6 +28,10 @@ import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.io.*;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -34,6 +40,8 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -42,6 +50,8 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.protobuf.BlockingService;
+
 /** A simple RPC mechanism.
  *
  * A <i>protocol</i> is a Java interface.  All parameters and return types must
@@ -61,17 +71,100 @@ import org.apache.hadoop.util.Reflection
  * the protocol instance is transmitted.
  */
 public class RPC {
+  
+  interface RpcInvoker {   
+    /**
+     * Process a client call on the server side
+     * @param server the server within whose context this rpc call is made
+     * @param protocol - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param rpcRequest  - deserialized
+     * @param receiveTime time at which the call received (for metrics)
+     * @return the call's return
+     * @throws IOException
+     **/
+    public Writable call(Server server, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException ;
+  }
+  
   static final Log LOG = LogFactory.getLog(RPC.class);
+  
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
+  
+  /**
+   * Get the protocol name.
+   *  If the protocol class has a ProtocolAnnotation, then get the protocol
+   *  name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public String getProtocolName(Class<?> protocol) {
+    if (protocol == null) {
+      return null;
+    }
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    return  (anno == null) ? protocol.getName() : anno.protocolName();
+  }
+  
+  /**
+   * Get the protocol version from protocol class.
+   * If the protocol class has a ProtocolAnnotation, then get the protocol
+   * name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public long getProtocolVersion(Class<?> protocol) {
+    if (protocol == null) {
+      throw new IllegalArgumentException("Null protocol");
+    }
+    long version;
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    if (anno != null) {
+      version = anno.protocolVersion();
+      if (version != -1)
+        return version;
+    }
+    try {
+      Field versionField = protocol.getField("versionID");
+      versionField.setAccessible(true);
+      return versionField.getLong(protocol);
+    } catch (NoSuchFieldException ex) {
+      throw new RuntimeException(ex);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
   private RPC() {}                                  // no public ctor
 
   // cache of RpcEngines by protocol
-  private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
-    = new HashMap<Class,RpcEngine>();
-
-  // track what RpcEngine is used by a proxy class, for stopProxy()
-  private static final Map<Class,RpcEngine> PROXY_ENGINES
-    = new HashMap<Class,RpcEngine>();
+  private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
+    = new HashMap<Class<?>,RpcEngine>();
 
   private static final String ENGINE_PROP = "rpc.engine";
 
@@ -82,32 +175,23 @@ public class RPC {
    * @param engine the RpcEngine impl
    */
   public static void setProtocolEngine(Configuration conf,
-                                Class protocol, Class engine) {
+                                Class<?> protocol, Class<?> engine) {
     conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
   }
 
   // return the RpcEngine configured to handle a protocol
-  private static synchronized RpcEngine getProtocolEngine(Class protocol,
-                                                          Configuration conf) {
+  static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
+      Configuration conf) {
     RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
     if (engine == null) {
       Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                     WritableRpcEngine.class);
       engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
-      if (protocol.isInterface())
-        PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
-                                              protocol),
-                          engine);
       PROTOCOL_ENGINES.put(protocol, engine);
     }
     return engine;
   }
 
-  // return the RpcEngine that handles a proxy object
-  private static synchronized RpcEngine getProxyEngine(Object proxy) {
-    return PROXY_ENGINES.get(proxy.getClass());
-  }
-
   /**
    * A version mismatch for the RPC protocol.
    */
@@ -441,7 +525,16 @@ public class RPC {
 
      return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
    }
-
+  
+  /**
+   * Returns the server address for a given proxy.
+   */
+  public static InetSocketAddress getServerAddress(Object proxy) {
+    RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
+        .getInvocationHandler(proxy);
+    return inv.getConnectionId().getAddress();
+  }
+   
   /**
    * Get a protocol proxy that contains a proxy connection to a remote server
    * and a set of methods that are supported by the server
@@ -463,13 +556,31 @@ public class RPC {
   }
 
   /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
+   * Stop this proxy and release its invoker's resource by getting the
+   * invocation handler for the given proxy object and calling
+   * {@link Closeable#close} if that invocation handler implements
+   * {@link Closeable}.
+   * 
+   * @param proxy the RPC proxy object to be stopped
    */
   public static void stopProxy(Object proxy) {
-    RpcEngine rpcEngine;
-    if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) {
-      rpcEngine.stopProxy(proxy);
+    InvocationHandler invocationHandler = null;
+    try {
+      invocationHandler = Proxy.getInvocationHandler(proxy);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
+    }
+    if (proxy != null && invocationHandler != null &&
+        invocationHandler instanceof Closeable) {
+      try {
+        ((Closeable)invocationHandler).close();
+      } catch (IOException e) {
+        LOG.error("Stopping RPC invocation handler caused exception", e);
+      }
+    } else {
+      LOG.error("Could not get invocation handler " + invocationHandler +
+          " for proxy class " + (proxy == null ? null : proxy.getClass()) +
+          ", or invocation handler is not closeable.");
     }
   }
 
@@ -518,7 +629,7 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance. */
-  public static Server getServer(Class protocol,
+  public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress,
                                  int port, Configuration conf) 
     throws IOException {
@@ -529,7 +640,7 @@ public class RPC {
    * @deprecated secretManager should be passed.
    */
   @Deprecated
-  public static Server getServer(Class protocol,
+  public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress, int port,
                                  int numHandlers,
                                  boolean verbose, Configuration conf) 
@@ -553,8 +664,10 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance. */
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress, int port,
+
+  public static <PROTO extends VersionedProtocol, IMPL extends PROTO> 
+        Server getServer(Class<PROTO> protocol,
+                                 IMPL instance, String bindAddress, int port,
                                  int numHandlers, int numReaders, int queueSizePerHandler,
                                  boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager) 
@@ -567,6 +680,147 @@ public class RPC {
 
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
+   boolean verbose;
+   static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+   
+   /**
+    * Store a map of protocol and version to its implementation
+    */
+   /**
+    *  The key in Map
+    */
+   static class ProtoNameVer {
+     final String protocol;
+     final long   version;
+     ProtoNameVer(String protocol, long ver) {
+       this.protocol = protocol;
+       this.version = ver;
+     }
+     @Override
+     public boolean equals(Object o) {
+       if (o == null) 
+         return false;
+       if (this == o) 
+         return true;
+       if (! (o instanceof ProtoNameVer))
+         return false;
+       ProtoNameVer pv = (ProtoNameVer) o;
+       return ((pv.protocol.equals(this.protocol)) && 
+           (pv.version == this.version));     
+     }
+     @Override
+     public int hashCode() {
+       return protocol.hashCode() * 37 + (int) version;    
+     }
+   }
+   
+   /**
+    * The value in map
+    */
+   static class ProtoClassProtoImpl {
+     final Class<?> protocolClass;
+     final Object protocolImpl; 
+     ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+       this.protocolClass = protocolClass;
+       this.protocolImpl = protocolImpl;
+     }
+   }
+
+   ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
+       new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+   
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+     if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+       for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+         protocolImplMapArray.add(
+             new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+       }
+     }
+     return protocolImplMapArray.get(rpcKind.ordinal());   
+   }
+   
+   // Register  protocol and its impl for rpc calls
+   void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
+       Object protocolImpl) throws IOException {
+     String protocolName = RPC.getProtocolName(protocolClass);
+     long version;
+     
+
+     try {
+       version = RPC.getProtocolVersion(protocolClass);
+     } catch (Exception ex) {
+       LOG.warn("Protocol "  + protocolClass + 
+            " NOT registered as cannot get protocol version ");
+       return;
+     }
+
+
+     getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+         new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+     LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
+         " ProtocolImpl=" + protocolImpl.getClass().getName() + 
+         " protocolClass=" + protocolClass.getName());
+   }
+   
+   static class VerProtocolImpl {
+     final long version;
+     final ProtoClassProtoImpl protocolTarget;
+     VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+       this.version = ver;
+       this.protocolTarget = protocolTarget;
+     }
+   }
+   
+   
+   @SuppressWarnings("unused") // will be useful later.
+   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+       String protocolName) {
+     VerProtocolImpl[] resultk = 
+         new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+     int i = 0;
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                       getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         resultk[i++] = 
+             new VerProtocolImpl(pv.getKey().version, pv.getValue());
+       }
+     }
+     if (i == 0) {
+       return null;
+     }
+     VerProtocolImpl[] result = new VerProtocolImpl[i];
+     System.arraycopy(resultk, 0, result, 0, i);
+     return result;
+   }
+   
+   VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, 
+       String protocolName) {    
+     Long highestVersion = 0L;
+     ProtoClassProtoImpl highest = null;
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("Size of protoMap for " + rpcKind + " ="
+           + getProtocolImplMap(rpcKind).size());
+     }
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
+           getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         if ((highest == null) || (pv.getKey().version > highestVersion)) {
+           highest = pv.getValue();
+           highestVersion = pv.getKey().version;
+         } 
+       }
+     }
+     if (highest == null) {
+       return null;
+     }
+     return new VerProtocolImpl(highestVersion,  highest);   
+   }
   
     protected Server(String bindAddress, int port, 
                      Class<? extends Writable> paramClass, int handlerCount,
@@ -575,7 +829,38 @@ public class RPC {
                      SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
             conf, serverName, secretManager);
+      initProtocolMetaInfo(conf);
+    }
+    
+    private void initProtocolMetaInfo(Configuration conf)
+        throws IOException {
+      RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
+          ProtobufRpcEngine.class);
+      ProtocolMetaInfoServerSideTranslatorPB xlator = 
+          new ProtocolMetaInfoServerSideTranslatorPB(this);
+      BlockingService protocolInfoBlockingService = ProtocolInfoService
+          .newReflectiveBlockingService(xlator);
+      addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,
+          protocolInfoBlockingService);
+    }
+    
+    /**
+     * Add a protocol to the existing server.
+     * @param protocolClass - the protocol class
+     * @param protocolImpl - the impl of the protocol that will be called
+     * @return the server (for convenience)
+     */
+    public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+        Object protocolImpl) throws IOException {
+      registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+      return this;
+    }
+    
+    @Override
+    public Writable call(RpcKind rpcKind, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException {
+      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+          receiveTime);
     }
   }
-
 }

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class maintains a cache of protocol versions and corresponding protocol
+ * signatures, keyed by server address, protocol and rpc kind.
+ * The cache is lazily populated. 
+ */
+public class RpcClientUtil {
+  private static RpcController NULL_CONTROLLER = null;
+  private static final int PRIME = 16777619;
+  
+  private static class ProtoSigCacheKey {
+    private InetSocketAddress serverAddress;
+    private String protocol;
+    private String rpcKind;
+    
+    ProtoSigCacheKey(InetSocketAddress addr, String p, String rk) {
+      this.serverAddress = addr;
+      this.protocol = p;
+      this.rpcKind = rk;
+    }
+    
+    @Override //Object
+    public int hashCode() {
+      int result = 1;
+      result = PRIME * result
+          + ((serverAddress == null) ? 0 : serverAddress.hashCode());
+      result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
+      result = PRIME * result + ((rpcKind == null) ? 0 : rpcKind.hashCode());
+      return result;
+    }
+    
+    @Override //Object
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (other instanceof ProtoSigCacheKey) {
+        ProtoSigCacheKey otherKey = (ProtoSigCacheKey) other;
+        return (serverAddress.equals(otherKey.serverAddress) &&
+            protocol.equals(otherKey.protocol) &&
+            rpcKind.equals(otherKey.rpcKind));
+      }
+      return false;
+    }
+  }
+  
+  private static ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>> 
+  signatureMap = new ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>>();
+
+  private static void putVersionSignatureMap(InetSocketAddress addr,
+      String protocol, String rpcKind, Map<Long, ProtocolSignature> map) {
+    signatureMap.put(new ProtoSigCacheKey(addr, protocol, rpcKind), map);
+  }
+  
+  private static Map<Long, ProtocolSignature> getVersionSignatureMap(
+      InetSocketAddress addr, String protocol, String rpcKind) {
+    return signatureMap.get(new ProtoSigCacheKey(addr, protocol, rpcKind));
+  }
+
+  /**
+   * Returns whether the given method is supported or not.
+   * The protocol signatures are fetched and cached. The connection id for the
+   * proxy provided is re-used.
+   * @param rpcProxy Proxy which provides an existing connection id.
+   * @param protocol Protocol for which the method check is required.
+   * @param rpcKind The RpcKind for which the method check is required.
+   * @param version The version at the client.
+   * @param methodName Name of the method.
+   * @return true if the method is supported, false otherwise.
+   * @throws IOException
+   */
+  public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
+      RpcKind rpcKind, long version, String methodName) throws IOException {
+    InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy);
+    Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap(
+        serverAddress, protocol.getName(), rpcKind.toString());
+
+    if (versionMap == null) {
+      Configuration conf = new Configuration();
+      RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
+          ProtobufRpcEngine.class);
+      ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy,
+          conf);
+      GetProtocolSignatureRequestProto.Builder builder = 
+          GetProtocolSignatureRequestProto.newBuilder();
+      builder.setProtocol(protocol.getName());
+      builder.setRpcKind(rpcKind.toString());
+      GetProtocolSignatureResponseProto resp;
+      try {
+        resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
+            builder.build());
+      } catch (ServiceException se) {
+        throw ProtobufHelper.getRemoteException(se);
+      }
+      versionMap = convertProtocolSignatureProtos(resp
+          .getProtocolSignatureList());
+      putVersionSignatureMap(serverAddress, protocol.getName(),
+          rpcKind.toString(), versionMap);
+    }
+    // Assuming unique method names.
+    Method desiredMethod;
+    Method[] allMethods = protocol.getMethods();
+    desiredMethod = null;
+    for (Method m : allMethods) {
+      if (m.getName().equals(methodName)) {
+        desiredMethod = m;
+        break;
+      }
+    }
+    if (desiredMethod == null) {
+      return false;
+    }
+    int methodHash = ProtocolSignature.getFingerprint(desiredMethod);
+    return methodExists(methodHash, version, versionMap);
+  }
+  
+  private static Map<Long, ProtocolSignature> 
+  convertProtocolSignatureProtos(List<ProtocolSignatureProto> protoList) {
+    Map<Long, ProtocolSignature> map = new TreeMap<Long, ProtocolSignature>();
+    for (ProtocolSignatureProto p : protoList) {
+      int [] methods = new int[p.getMethodsList().size()];
+      int index=0;
+      for (int m : p.getMethodsList()) {
+        methods[index++] = m;
+      }
+      map.put(p.getVersion(), new ProtocolSignature(p.getVersion(), methods));
+    }
+    return map;
+  }
+
+  private static boolean methodExists(int methodHash, long version,
+      Map<Long, ProtocolSignature> versionMap) {
+    ProtocolSignature sig = versionMap.get(version);
+    if (sig != null) {
+      for (int m : sig.getMethods()) {
+        if (m == methodHash) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  
+  // The proxy returned re-uses the underlying connection. This is a special 
+  // mechanism for ProtocolMetaInfoPB.
+  // Don't do this for any other protocol, it might cause a security hole.
+  private static ProtocolMetaInfoPB getProtocolMetaInfoProxy(Object proxy,
+      Configuration conf) throws IOException {
+    RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
+        .getInvocationHandler(proxy);
+    return RPC
+        .getProtocolEngine(ProtocolMetaInfoPB.class, conf)
+        .getProtocolMetaInfoProxy(inv.getConnectionId(), conf,
+            NetUtils.getDefaultSocketFactory(conf)).getProxy();
+  }
+}

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java Sun Feb 26 23:32:06 2012
@@ -26,6 +26,7 @@ import javax.net.SocketFactory;
 
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -41,9 +42,6 @@ public interface RpcEngine {
                   UserGroupInformation ticket, Configuration conf,
                   SocketFactory factory, int rpcTimeout) throws IOException;
 
-  /** Stop this proxy. */
-  void stopProxy(Object proxy);
-
   /** Expert: Make multiple, parallel calls to a set of servers. */
   Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
                 UserGroupInformation ticket, Configuration conf)
@@ -57,4 +55,16 @@ public interface RpcEngine {
                        SecretManager<? extends TokenIdentifier> secretManager
                        ) throws IOException;
 
+  /**
+   * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection
+   * id.
+   * @param connId, ConnectionId to be used for the proxy.
+   * @param conf, Configuration.
+   * @param factory, Socket factory.
+   * @return Proxy object.
+   * @throws IOException
+   */
+  ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException;
 }

Copied: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java (from r1293950, hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java?p2=hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java&p1=hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java&r1=1293950&r2=1293964&rev=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java Sun Feb 26 23:32:06 2012
@@ -15,25 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.ipc;
 
-package org.apache.hadoop.hdfs.security.token.block;
+import java.io.Closeable;
+import java.lang.reflect.InvocationHandler;
 
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 
 /**
- * Key used for generating and verifying block tokens
+ * This interface must be implemented by all InvocationHandler
+ * implementations.
  */
-@InterfaceAudience.Private
-public class BlockKey extends DelegationKey {
-
-  public BlockKey() {
-    super();
-  }
-
-  public BlockKey(int keyId, long expiryDate, SecretKey key) {
-    super(keyId, expiryDate, key);
-  }
+public interface RpcInvocationHandler extends InvocationHandler, Closeable {
+  
+  /**
+   * Returns the connection id associated with the InvocationHandler instance.
+   * @return ConnectionId
+   */
+  ConnectionId getConnectionId();
 }

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Sun Feb 26 23:32:06 2012
@@ -0,0 +1,118 @@
+package org.apache.hadoop.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This is the rpc payload header. It is sent with every rpc call
+ * <pre>
+ * The format of RPC call is as follows:
+ * +---------------------------------------------------+
+ * |  Rpc length in bytes (header + payload length)    |
+ * +---------------------------------------------------+
+ * |      Rpc Header       |       Rpc Payload         |
+ * +---------------------------------------------------+
+ * 
+ * The format of Rpc Header is:
+ * +----------------------------------+
+ * |  RpcKind (1 bytes)               |      
+ * +----------------------------------+
+ * |  RpcPayloadOperation (1 bytes)   |      
+ * +----------------------------------+
+ * |  Call ID (4 bytes)               |      
+ * +----------------------------------+
+ * 
+ * {@link RpcKind} determines the type of serialization used for Rpc Payload.
+ * </pre>
+ * <p>
+ * <b>Note this header does NOT have its own version number, 
+ * it used the version number from the connection header. </b>
+ */
+public class RpcPayloadHeader implements Writable {
+  public enum RpcPayloadOperation {
+    RPC_FINAL_PAYLOAD ((short)1),
+    RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet
+    RPC_CLOSE_CONNECTION ((short)3);     // close the rpc connection
+    
+    private final short code;
+    private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code;
+    RpcPayloadOperation(short val) {
+      this.code = val;
+    }
+    
+    public void write(DataOutput out) throws IOException {  
+      out.writeByte(code);
+    }
+    
+    static RpcPayloadOperation readFields(DataInput in) throws IOException {
+      short inValue = in.readByte();
+      return RpcPayloadOperation.values()[inValue - FIRST_INDEX];
+    }
+  }
+  
+  public enum RpcKind {
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
+    final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
+    private final short value;
+
+    RpcKind(short val) {
+      this.value = val;
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      out.writeByte(value);
+    }
+    
+    static RpcKind readFields(DataInput in) throws IOException {
+      short inValue = in.readByte();
+      return RpcKind.values()[inValue - FIRST_INDEX];
+    }  
+  }
+  
+  private RpcKind kind;
+  private RpcPayloadOperation operation;
+  private int callId;
+  
+  public RpcPayloadHeader() {
+    kind = RpcKind.RPC_WRITABLE;
+    operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION;
+  }
+  
+  public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) {
+    this.kind  = kind;
+    this.operation = op;
+    this.callId = callId;
+  }
+  
+  int getCallId() {
+    return callId;
+  }
+  
+  RpcKind getkind() {
+    return kind;
+  }
+  
+  RpcPayloadOperation getOperation() {
+    return operation;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    kind.write(out);
+    operation.write(out);
+    out.writeInt(callId); 
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    kind = RpcKind.readFields(in);
+    operation = RpcPayloadOperation.readFields(in);
+    this.callId = in.readInt();
+  }
+}

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcServerException.java Sun Feb 26 23:32:06 2012
@@ -25,10 +25,9 @@ public class RpcServerException extends 
 
   /**
    * Constructs exception with the specified detail message.
-   * 
-   * @param messages detailed message.
+   * @param message detailed message.
    */
-  RpcServerException(final String message) {
+  public RpcServerException(final String message) {
     super(message);
   }
   
@@ -36,12 +35,11 @@ public class RpcServerException extends 
    * Constructs exception with the specified detail message and cause.
    * 
    * @param message message.
-   * @param cause that cause this exception
    * @param cause the cause (can be retried by the {@link #getCause()} method).
    *          (A <tt>null</tt> value is permitted, and indicates that the cause
    *          is nonexistent or unknown.)
    */
-  RpcServerException(final String message, final Throwable cause) {
+  public RpcServerException(final String message, final Throwable cause) {
     super(message, cause);
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sun Feb 26 23:32:06 2012
@@ -42,7 +42,9 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,28 +68,33 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
 import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -108,12 +115,66 @@ public abstract class Server {
   // 4 : Introduced SASL security layer
   // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
   //     in ObjectWritable to efficiently transmit arrays of primitives
-  public static final byte CURRENT_VERSION = 5;
+  // 6 : Made RPC payload header explicit
+  public static final byte CURRENT_VERSION = 6;
 
   /**
    * Initial and max size of response buffer
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.debug("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
@@ -178,7 +239,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -239,10 +300,21 @@ public abstract class Server {
    * Returns a handle to the rpcMetrics (required in tests)
    * @return rpc metrics
    */
+  @VisibleForTesting
   public RpcMetrics getRpcMetrics() {
     return rpcMetrics;
   }
 
+  @VisibleForTesting
+  public RpcDetailedMetrics getRpcDetailedMetrics() {
+    return rpcDetailedMetrics;
+  }
+  
+  @VisibleForTesting
+  Iterable<? extends Thread> getHandlers() {
+    return Arrays.asList(handlers);
+  }
+
   /**
    * Refresh the service authorization ACL for the service handled by this server.
    */
@@ -261,28 +333,33 @@ public abstract class Server {
 
   /** A call queued for handling. */
   private static class Call {
-    private int id;                               // the client's call id
-    private Writable param;                       // the parameter passed
-    private Connection connection;                // connection to client
-    private long timestamp;     // the time received when response is null
-                                   // the time served when response is not null
-    private ByteBuffer response;                      // the response for this call
-
-    public Call(int id, Writable param, Connection connection) { 
-      this.id = id;
-      this.param = param;
+    private final int callId;             // the client's call id
+    private final Writable rpcRequest;    // Serialized Rpc request from client
+    private final Connection connection;  // connection to client
+    private long timestamp;               // time received when response is null
+                                          // time served when response is not null
+    private ByteBuffer rpcResponse;       // the response for this call
+    private final RpcKind rpcKind;
+
+    public Call(int id, Writable param, Connection connection) {
+      this( id,  param,  connection, RpcKind.RPC_BUILTIN );    
+    }
+    public Call(int id, Writable param, Connection connection, RpcKind kind) { 
+      this.callId = id;
+      this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
-      this.response = null;
+      this.rpcResponse = null;
+      this.rpcKind = kind;
     }
     
     @Override
     public String toString() {
-      return param.toString() + " from " + connection.toString();
+      return rpcRequest.toString() + " from " + connection.toString();
     }
 
     public void setResponse(ByteBuffer response) {
-      this.response = response;
+      this.rpcResponse = response;
     }
   }
 
@@ -781,17 +858,17 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                       call.connection);
           }
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channelWrite(channel, call.response);
+          int numBytes = channelWrite(channel, call.rpcResponse);
           if (numBytes < 0) {
             return true;
           }
-          if (!call.response.hasRemaining()) {
+          if (!call.rpcResponse.hasRemaining()) {
             call.connection.decRpcCount();
             if (numElements == 1) {    // last call fully processes.
               done = true;             // no more data for this channel.
@@ -799,7 +876,7 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -827,7 +904,7 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote partial " + numBytes + 
                         " bytes.");
             }
@@ -893,7 +970,7 @@ public abstract class Server {
     private InetAddress addr;
     
     ConnectionHeader header = new ConnectionHeader();
-    Class<?> protocol;
+    String protocolName;
     boolean useSasl;
     SaslServer saslServer;
     private AuthMethod authMethod;
@@ -1284,15 +1361,8 @@ public abstract class Server {
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(buf));
       header.readFields(in);
-      try {
-        String protocolClassName = header.getProtocol();
-        if (protocolClassName != null) {
-          protocol = getProtocolClass(header.getProtocol(), conf);
-          rpcDetailedMetrics.init(protocol);
-        }
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown protocol: " + header.getProtocol());
-      }
+      protocolName = header.getProtocol();
+
       
       UserGroupInformation protocolUser = header.getUgi();
       if (!useSasl) {
@@ -1384,18 +1454,43 @@ public abstract class Server {
     private void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
-      int id = dis.readInt();                    // try to read an id
+      RpcPayloadHeader header = new RpcPayloadHeader();
+      header.readFields(dis);           // Read the RpcPayload header
         
       if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
-      Writable param;
-      try {
-        param = ReflectionUtils.newInstance(paramClass, conf);//read param
-        param.readFields(dis);
+        LOG.debug(" got #" + header.getCallId());
+      if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+        throw new IOException("IPC Server does not implement operation" + 
+              header.getOperation());
+      }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
+      Writable rpcRequest;
+      try { //Read the rpc request
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
+        rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
-                 getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this);
+                 getHostAddress() + "on connection protocol " +
+            this.protocolName + " for rpcKind " + header.getkind(),  t);
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1405,7 +1500,7 @@ public abstract class Server {
         return;
       }
         
-      Call call = new Call(id, param, this);
+      Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
@@ -1469,8 +1564,8 @@ public abstract class Server {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": has Call#" + call.callId + 
+                "for RpcKind " + call.rpcKind + " from " + call.connection);
           
           String errorClass = null;
           String error = null;
@@ -1481,7 +1576,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocol, call.param, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1490,8 +1585,8 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocol, 
-                                   call.param, call.timestamp);
+                       return call(call.rpcKind, call.connection.protocolName, 
+                                   call.rpcRequest, call.timestamp);
 
                      }
                    }
@@ -1543,24 +1638,33 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
@@ -1641,7 +1745,7 @@ public abstract class Server {
   throws IOException {
     response.reset();
     DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.id);                // write call id
+    out.writeInt(call.callId);                // write call id
     out.writeInt(status.state);           // write status
 
     if (status == Status.SUCCESS) {
@@ -1758,17 +1862,17 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(Class<?> protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   /**
    * Authorize the incoming client connection.
@@ -1918,5 +2022,5 @@ public abstract class Server {
 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Sun Feb 26 23:32:06 2012
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
    * @return the version that the server will speak
    * @throws IOException if any IO error occurs
    */
-  @Deprecated
   public long getProtocolVersion(String protocol,
                                  long clientVersion) throws IOException;
 

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sun Feb 26 23:32:06 2012
@@ -18,23 +18,23 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,8 +49,38 @@ public class WritableRpcEngine implement
   
   //writableRpcVersion should be updated if there is a change
   //in format of the rpc messages.
-  public static long writableRpcVersion = 1L;
+  
+  // 2L - added declared class to Invocation
+  public static final long writableRpcVersion = 2L;
+  
+  /**
+   * Whether or not this class has been initialized.
+   */
+  private static boolean isInitialized = false;
+  
+  static { 
+    ensureInitialized();
+  }
+  
+  /**
+   * Initialize this class if it isn't already.
+   */
+  public static synchronized void ensureInitialized() {
+    if (!isInitialized) {
+      initialize();
+    }
+  }
+  
+  /**
+   * Register the rpcRequest deserializer for WritableRpcEngine
+   */
+  private static synchronized void initialize() {
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
+    isInitialized = true;
+  }
 
+  
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
     private String methodName;
@@ -59,11 +89,13 @@ public class WritableRpcEngine implement
     private Configuration conf;
     private long clientVersion;
     private int clientMethodsHash;
+    private String declaringClassProtocolName;
     
     //This could be different from static writableRpcVersion when received
     //at server, if client is using a different version.
     private long rpcVersion;
 
+    @SuppressWarnings("unused") // called when deserializing an invocation
     public Invocation() {}
 
     public Invocation(Method method, Object[] parameters) {
@@ -76,18 +108,12 @@ public class WritableRpcEngine implement
         clientVersion = 0;
         clientMethodsHash = 0;
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
+      this.declaringClassProtocolName = 
+          RPC.getProtocolName(method.getDeclaringClass());
     }
 
     /** The name of the method invoked. */
@@ -103,6 +129,7 @@ public class WritableRpcEngine implement
       return clientVersion;
     }
 
+    @SuppressWarnings("unused")
     private int getClientMethodsHash() {
       return clientMethodsHash;
     }
@@ -115,8 +142,10 @@ public class WritableRpcEngine implement
       return rpcVersion;
     }
 
+    @SuppressWarnings("deprecation")
     public void readFields(DataInput in) throws IOException {
       rpcVersion = in.readLong();
+      declaringClassProtocolName = UTF8.readString(in);
       methodName = UTF8.readString(in);
       clientVersion = in.readLong();
       clientMethodsHash = in.readInt();
@@ -124,13 +153,16 @@ public class WritableRpcEngine implement
       parameterClasses = new Class[parameters.length];
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = 
+            ObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
+    @SuppressWarnings("deprecation")
     public void write(DataOutput out) throws IOException {
       out.writeLong(rpcVersion);
+      UTF8.writeString(out, declaringClassProtocolName);
       UTF8.writeString(out, methodName);
       out.writeLong(clientVersion);
       out.writeInt(clientMethodsHash);
@@ -169,7 +201,7 @@ public class WritableRpcEngine implement
 
   private static ClientCache CLIENTS=new ClientCache();
   
-  private static class Invoker implements InvocationHandler {
+  private static class Invoker implements RpcInvocationHandler {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
@@ -191,7 +223,7 @@ public class WritableRpcEngine implement
       }
 
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), remoteId);
+        client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -200,12 +232,17 @@ public class WritableRpcEngine implement
     }
     
     /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
+    synchronized public void close() {
       if (!isClosed) {
         isClosed = true;
         CLIENTS.stopClient(client);
       }
     }
+
+    @Override
+    public ConnectionId getConnectionId() {
+      return remoteId;
+    }
   }
   
   // for unit testing only
@@ -231,15 +268,6 @@ public class WritableRpcEngine implement
             factory, rpcTimeout));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
-   */
-  public void stopProxy(Object proxy) {
-    ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-  }
-
   
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public Object[] call(Method method, Object[][] params,
@@ -273,134 +301,238 @@ public class WritableRpcEngine implement
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public Server getServer(Class<?> protocol,
-                          Object instance, String bindAddress, int port,
-                          int numHandlers, int numReaders, int queueSizePerHandler,
-                          boolean verbose, Configuration conf,
+  public RPC.Server getServer(Class<?> protocolClass,
+                      Object protocolImpl, String bindAddress, int port,
+                      int numHandlers, int numReaders, int queueSizePerHandler,
+                      boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, 
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
 
+
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private Object instance;
-    private boolean verbose;
-
-    /** Construct an RPC server.
+    /**
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
+     * 
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
+     */
+    @Deprecated
+    public Server(Object instance, Configuration conf, String bindAddress,
+        int port) throws IOException {
+      this(null, instance, conf,  bindAddress, port);
+    }
+    
+    
+    /** Construct an RPC server.
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
      */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+    public Server(Class<?> protocolClass, Object protocolImpl, 
+        Configuration conf, String bindAddress, int port) 
       throws IOException {
-      this(instance, conf,  bindAddress, port, 1, -1, -1, false, null);
+      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
+          false, null);
     }
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
+    /** 
+     * Construct an RPC server.
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * 
+     * @deprecated use Server#Server(Class, Object, 
+     *      Configuration, String, int, int, int, int, boolean, SecretManager)
+     */
+    @Deprecated
+    public Server(Object protocolImpl, Configuration conf, String bindAddress,
+        int port, int numHandlers, int numReaders, int queueSizePerHandler,
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
+            throws IOException {
+       this(null, protocolImpl,  conf,  bindAddress,   port,
+                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
+                   secretManager);
+   
     }
     
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
+    /** 
+     * Construct an RPC server.
+     * @param protocolClass - the protocol being registered
+     *     can be null for compatibility with old usage (see below for details)
+     * @param protocolImpl the protocol impl that will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
-                  SecretManager<? extends TokenIdentifier> secretManager) 
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler, 
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
-          classNameBase(instance.getClass().getName()), secretManager);
-      this.instance = instance;
+          classNameBase(protocolImpl.getClass().getName()), secretManager);
+
       this.verbose = verbose;
-    }
+      
+      
+      Class<?>[] protocols;
+      if (protocolClass == null) { // derive protocol from impl
+        /*
+         * In order to remain compatible with the old usage where a single
+         * target protocolImpl is suppled for all protocol interfaces, and
+         * the protocolImpl is derived from the protocolClass(es) 
+         * we register all interfaces extended by the protocolImpl
+         */
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        Method method = protocol.getMethod(call.getMethodName(),
-                                           call.getParameterClasses());
-        method.setAccessible(true);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
+      } else {
+        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+          throw new IOException("protocolClass "+ protocolClass +
+              " is not implemented by protocolImpl which is of class " +
+              protocolImpl.getClass());
         }
-        
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
+        // register protocol class and its super interfaces
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
+      }
+      for (Class<?> p : protocols) {
+        if (!p.equals(VersionedProtocol.class)) {
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
+        }
+      }
+
+    }
+
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
+    }
+    
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
+
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
+
           long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
+            if (highest == null) {
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
+            }
           }
-        }
+          
 
-        long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          // Invoke the protocol method
+
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
     }
   }
 
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
+  @Override
+  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    throw new UnsupportedOperationException("This proxy is not supported");
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java Sun Feb 26 23:32:06 2012
@@ -42,15 +42,20 @@ public class DelegationKey implements Wr
   @Nullable
   private byte[] keyBytes = null;
 
+  /** Default constructore required for Writable */
   public DelegationKey() {
-    this(0, 0L, null);
+    this(0, 0L, (SecretKey)null);
   }
 
   public DelegationKey(int keyId, long expiryDate, SecretKey key) {
+    this(keyId, expiryDate, key != null ? key.getEncoded() : null);
+  }
+  
+  public DelegationKey(int keyId, long expiryDate, byte[] encodedKey) {
     this.keyId = keyId;
     this.expiryDate = expiryDate;
-    if (key!=null) {
-      this.keyBytes = key.getEncoded();
+    if (encodedKey != null) {
+      this.keyBytes = encodedKey;
     }
   }
 
@@ -70,6 +75,10 @@ public class DelegationKey implements Wr
       return key;
     }
   }
+  
+  public byte[] getEncodedKey() {
+    return keyBytes;
+  }
 
   public void setExpiryDate(long expiryDate) {
     this.expiryDate = expiryDate;

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java Sun Feb 26 23:32:06 2012
@@ -94,7 +94,7 @@ public abstract class GetGroupsBase exte
    * @return A {@link GetUserMappingsProtocol} client proxy.
    * @throws IOException
    */
-  private GetUserMappingsProtocol getUgmProtocol() throws IOException {
+  protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
     GetUserMappingsProtocol userGroupMappingProtocol =
       RPC.getProxy(GetUserMappingsProtocol.class, 
           GetUserMappingsProtocol.versionID,

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto Sun Feb 26 23:32:06 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.ipc.protobuf";
+option java_outer_classname = "ProtocolInfoProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+/**
+ * Request to get protocol versions for all supported rpc kinds.
+ */
+message GetProtocolVersionsRequestProto {
+  required string protocol = 1; // Protocol name
+}
+
+/**
+ * Protocol version with corresponding rpc kind.
+ */
+message ProtocolVersionProto {
+  required string rpcKind = 1; //RPC kind
+  repeated uint64 versions = 2; //Protocol version corresponding to the rpc kind.
+}
+
+/**
+ * Get protocol version response.
+ */
+message GetProtocolVersionsResponseProto { 
+  repeated ProtocolVersionProto protocolVersions = 1;
+}
+
+/**
+ * Get protocol signature request.
+ */
+message GetProtocolSignatureRequestProto {
+  required string protocol = 1; // Protocol name
+  required string rpcKind = 2; // RPC kind
+}
+
+/**
+ * Get protocol signature response.
+ */ 
+message GetProtocolSignatureResponseProto {
+  repeated ProtocolSignatureProto protocolSignature = 1;
+}
+
+message ProtocolSignatureProto {
+  required uint64 version = 1;
+  repeated uint32 methods = 2;
+}
+
+/**
+ * Protocol to get information about protocols.
+ */
+service ProtocolInfoService {
+  /**
+   * Return protocol version corresponding to protocol interface for each
+   * supported rpc kind.
+   */
+  rpc getProtocolVersions(GetProtocolVersionsRequestProto) 
+      returns (GetProtocolVersionsResponseProto);
+
+  /**
+   * Return protocol version corresponding to protocol interface.
+   */
+  rpc getProtocolSignature(GetProtocolSignatureRequestProto) 
+      returns (GetProtocolSignatureResponseProto);
+}

Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto?rev=1293964&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto (added)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto Sun Feb 26 23:32:06 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These are the messages used by Hadoop RPC to marshal the
+ * request and response in the RPC layer.
+ */
+option java_package = "org.apache.hadoop.ipc.protobuf";
+option java_outer_classname = "HadoopRpcProtos";
+option java_generate_equals_and_hash = true;
+
+/**
+ * Message used to marshal the client request
+ * from RPC client to the RPC server.
+ */
+message HadoopRpcRequestProto {
+  /** Name of the RPC method */
+  required string methodName = 1;
+
+  /** Bytes corresponding to the client protobuf request */
+  optional bytes request = 2;
+  
+  /** protocol name of class declaring the called method */ 
+  required string declaringClassProtocolName = 3;
+  
+  /** protocol version of class declaring the called method */
+  required uint64 clientProtocolVersion = 4;
+}
+
+/**
+ * At the RPC layer, this message is used to indicate
+ * the server side exception the the RPC client.
+ *
+ * Hadoop RPC client throws an exception indicated
+ * by exceptionName with the stackTrace.
+ */
+message HadoopRpcExceptionProto {
+  /** Class name of the exception thrown from the server */
+
+  optional string exceptionName = 1;
+  /** Exception stack trace from the server side */
+  optional string stackTrace = 2;
+}
+
+/**
+ * This message is used to marshal the response from
+ * RPC server to the client.
+ */
+message HadoopRpcResponseProto {
+  /** Status of IPC call */
+  enum ResponseStatus {
+    SUCCESS = 1;
+    ERRROR = 2;
+  }
+
+  required ResponseStatus status = 1;
+
+  // Protobuf response payload from the server, when status is SUCCESS.
+  optional bytes response = 2;
+
+  // Exception when status is ERROR or FATAL
+  optional HadoopRpcExceptionProto exception = 3;
+}
+

Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Sun Feb 26 23:32:06 2012
@@ -57,6 +57,11 @@ public class TestFailoverProxy {
     public Class<?> getInterface() {
       return iface;
     }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do.
+    }
     
   }
   



Mime
View raw message