hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1167318 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/
Date Fri, 09 Sep 2011 18:12:15 GMT
Author: atm
Date: Fri Sep  9 18:12:14 2011
New Revision: 1167318

URL: http://svn.apache.org/viewvc?rev=1167318&view=rev
Log:
HADOOP-7607 and MAPREDUCE-2934. Simplify the RPC proxy cleanup process. (atm)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1167318&r1=1167317&r2=1167318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Sep  9 18:12:14
2011
@@ -5,7 +5,11 @@ Trunk (unreleased changes)
   IMPROVEMENTS
 
     HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm)
-  HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same
protocol (sanjay Radia)
+
+    HADOOP-7524. Change RPC to allow multiple protocols including multuple
+                 versions of the same protocol (sanjay Radia)
+
+    HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
 
   BUGS
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1167318&r1=1167317&r2=1167318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
Fri Sep  9 18:12:14 2011
@@ -131,7 +131,7 @@ public class AvroRpcEngine implements Rp
     }
 
     public void close() throws IOException {
-      ENGINE.stopProxy(tunnel);
+      RPC.stopProxy(tunnel);
     }
   }
 
@@ -152,15 +152,6 @@ public class AvroRpcEngine implements Rp
        false);
   }
 
-  /** Stop this proxy. */
-  public void stopProxy(Object proxy) {
-    try {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-    } catch (IOException e) {
-      LOG.warn("Error while stopping "+proxy, e);
-    }
-  }
-
   private class Invoker implements InvocationHandler, Closeable {
     private final ClientTransceiver tx;
     private final SpecificRequestor requestor;

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1167318&r1=1167317&r2=1167318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
Fri Sep  9 18:12:14 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 
@@ -26,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.io.*;
+import java.io.Closeable;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -80,12 +82,8 @@ public class RPC {
   private RPC() {}                                  // no public ctor
 
   // cache of RpcEngines by protocol
-  private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
-    = new HashMap<Class,RpcEngine>();
-
-  // track what RpcEngine is used by a proxy class, for stopProxy()
-  private static final Map<Class,RpcEngine> PROXY_ENGINES
-    = new HashMap<Class,RpcEngine>();
+  private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
+    = new HashMap<Class<?>,RpcEngine>();
 
   private static final String ENGINE_PROP = "rpc.engine";
 
@@ -96,32 +94,23 @@ public class RPC {
    * @param engine the RpcEngine impl
    */
   public static void setProtocolEngine(Configuration conf,
-                                Class protocol, Class engine) {
+                                Class<?> protocol, Class<?> engine) {
     conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
   }
 
   // return the RpcEngine configured to handle a protocol
-  private static synchronized RpcEngine getProtocolEngine(Class protocol,
+  private static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
                                                           Configuration conf) {
     RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
     if (engine == null) {
       Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                     WritableRpcEngine.class);
       engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
-      if (protocol.isInterface())
-        PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
-                                              protocol),
-                          engine);
       PROTOCOL_ENGINES.put(protocol, engine);
     }
     return engine;
   }
 
-  // return the RpcEngine that handles a proxy object
-  private static synchronized RpcEngine getProxyEngine(Object proxy) {
-    return PROXY_ENGINES.get(proxy.getClass());
-  }
-
   /**
    * A version mismatch for the RPC protocol.
    */
@@ -477,13 +466,30 @@ public class RPC {
   }
 
   /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
+   * Stop this proxy and release its invoker's resource by getting the
+   * invocation handler for the given proxy object and calling
+   * {@link Closeable#close} if that invocation handler implements
+   * {@link Closeable}.
+   * 
+   * @param proxy the RPC proxy object to be stopped
    */
   public static void stopProxy(Object proxy) {
-    RpcEngine rpcEngine;
-    if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) {
-      rpcEngine.stopProxy(proxy);
+    InvocationHandler invocationHandler = null;
+    try {
+      invocationHandler = Proxy.getInvocationHandler(proxy);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
+    }
+    if (proxy != null && invocationHandler != null &&
+        invocationHandler instanceof Closeable) {
+      try {
+        ((Closeable)invocationHandler).close();
+      } catch (IOException e) {
+        LOG.error("Stopping RPC invocation handler caused exception", e);
+      }
+    } else {
+      LOG.error("Could not get invocation handler " + invocationHandler +
+          " for proxy " + proxy + ", or invocation handler is not closeable.");
     }
   }
 
@@ -532,7 +538,7 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance. */
-  public static Server getServer(Class protocol,
+  public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress,
                                  int port, Configuration conf) 
     throws IOException {
@@ -543,7 +549,7 @@ public class RPC {
    * @deprecated secretManager should be passed.
    */
   @Deprecated
-  public static Server getServer(Class protocol,
+  public static Server getServer(Class<?> protocol,
                                  Object instance, String bindAddress, int port,
                                  int numHandlers,
                                  boolean verbose, Configuration conf) 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1167318&r1=1167317&r2=1167318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
Fri Sep  9 18:12:14 2011
@@ -41,9 +41,6 @@ public interface RpcEngine {
                   UserGroupInformation ticket, Configuration conf,
                   SocketFactory factory, int rpcTimeout) throws IOException;
 
-  /** Stop this proxy. */
-  void stopProxy(Object proxy);
-
   /** Expert: Make multiple, parallel calls to a set of servers. */
   Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
                 UserGroupInformation ticket, Configuration conf)

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1167318&r1=1167317&r2=1167318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
Fri Sep  9 18:12:14 2011
@@ -30,6 +30,7 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.io.Closeable;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -219,7 +220,7 @@ public class WritableRpcEngine implement
 
   private static ClientCache CLIENTS=new ClientCache();
   
-  private static class Invoker implements InvocationHandler {
+  private static class Invoker implements InvocationHandler, Closeable {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
@@ -250,7 +251,7 @@ public class WritableRpcEngine implement
     }
     
     /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
+    synchronized public void close() {
       if (!isClosed) {
         isClosed = true;
         CLIENTS.stopClient(client);
@@ -281,15 +282,6 @@ public class WritableRpcEngine implement
             factory, rpcTimeout));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
-   */
-  public void stopProxy(Object proxy) {
-    ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-  }
-
   
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public Object[] call(Method method, Object[][] params,



Mime
View raw message