hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1171221 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/retry/ src/test/java/org/apache/hadoop/io/retry/ src/test/java/org/apache/hadoop/ipc/
Date Thu, 15 Sep 2011 18:45:36 GMT
Author: atm
Date: Thu Sep 15 18:45:36 2011
New Revision: 1171221

URL: http://svn.apache.org/viewvc?rev=1171221&view=rev
Log:
HADOOP-7635. RetryInvocationHandler should release underlying resources on close (atm)

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Sep 15 18:45:36
2011
@@ -11,6 +11,9 @@ Trunk (unreleased changes)
 
     HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
 
+    HADOOP-7635. RetryInvocationHandler should release underlying resources on
+                 close (atm)
+
   BUGS
 
     HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
Thu Sep 15 18:45:36 2011
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.RPC;
 
 /**
  * An implementation of {@link FailoverProxyProvider} which does nothing in the
@@ -49,4 +52,9 @@ public class DefaultFailoverProxyProvide
     // Nothing to do.
   }
 
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
Thu Sep 15 18:45:36 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.io.Closeable;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
@@ -27,7 +29,7 @@ import org.apache.hadoop.classification.
  * {@link RetryPolicy}.
  */
 @InterfaceStability.Evolving
-public interface FailoverProxyProvider {
+public interface FailoverProxyProvider extends Closeable {
 
   /**
    * Get the proxy object which should be used until the next failover event

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
Thu Sep 15 18:45:36 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -27,7 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 
-class RetryInvocationHandler implements InvocationHandler {
+class RetryInvocationHandler implements InvocationHandler, Closeable {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   private FailoverProxyProvider proxyProvider;
   
@@ -103,4 +105,9 @@ class RetryInvocationHandler implements 
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    proxyProvider.close();
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
Thu Sep 15 18:45:36 2011
@@ -57,6 +57,11 @@ public class TestFailoverProxy {
     public Class<?> getInterface() {
       return iface;
     }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do.
+    }
     
   }
   

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1171221&r1=1171220&r2=1171221&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Thu Sep 15 18:45:36 2011
@@ -18,28 +18,38 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Arrays;
 
-import junit.framework.TestCase;
+import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
@@ -49,18 +59,22 @@ import static org.apache.hadoop.test.Met
 import static org.mockito.Mockito.*;
 
 /** Unit tests for RPC. */
-public class TestRPC extends TestCase {
+@SuppressWarnings("deprecation")
+public class TestRPC {
   private static final String ADDRESS = "0.0.0.0";
 
   public static final Log LOG =
     LogFactory.getLog(TestRPC.class);
   
   private static Configuration conf = new Configuration();
+  
+  static {
+    conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
+        StoppedRpcEngine.class, RpcEngine.class);
+  }
 
   int datasize = 1024*100;
   int numThreads = 50;
-
-  public TestRPC(String name) { super(name); }
 	
   public interface TestProtocol extends VersionedProtocol {
     public static final long versionID = 1L;
@@ -207,6 +221,74 @@ public class TestRPC extends TestCase {
     }
   }
   
+  /**
+   * A basic interface for testing client-side RPC resource cleanup.
+   */
+  private static interface StoppedProtocol {
+    long versionID = 0;
+
+    public void stop();
+  }
+  
+  /**
+   * A class used for testing cleanup of client side RPC resources.
+   */
+  private static class StoppedRpcEngine implements RpcEngine {
+
+    @Override
+    public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+        UserGroupInformation ticket, Configuration conf)
+        throws IOException, InterruptedException {
+      return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout) throws IOException {
+      T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+              new Class[] { protocol }, new StoppedInvocationHandler());
+      return new ProtocolProxy<T>(protocol, proxy, false);
+    }
+
+    @Override
+    public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+        Object instance, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager) throws IOException
{
+      return null;
+    }
+    
+  }
+
+  /**
+   * An invocation handler which does nothing when invoking methods, and just
+   * counts the number of times close() is called.
+   */
+  private static class StoppedInvocationHandler
+      implements InvocationHandler, Closeable {
+    
+    private int closeCalled = 0;
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+          return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCalled++;
+    }
+    
+    public int getCloseCalled() {
+      return closeCalled;
+    }
+    
+  }
+  
+  @Test
   public void testConfRpc() throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, 1, false, conf, null);
@@ -229,6 +311,7 @@ public class TestRPC extends TestCase {
     server.stop();    
   }
 
+  @Test
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     // create a server with two handlers
@@ -273,11 +356,12 @@ public class TestRPC extends TestCase {
     }
   }
   
-  public void testRPCConf(Configuration conf) throws Exception {
-    
+  @Test
+  public void testCalls() throws Exception {
+    testCallsInternal(conf);
   }
-
-  public void testCalls(Configuration conf) throws Exception {
+  
+  private void testCallsInternal(Configuration conf) throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
@@ -384,6 +468,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testStandaloneClient() throws IOException {
     try {
       TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
@@ -450,6 +535,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -481,20 +567,48 @@ public class TestRPC extends TestCase {
     Configuration conf = new Configuration();
     
     conf.setBoolean("ipc.client.ping", false);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
     
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
   }
 
   /**
    * Test stopping a non-registered proxy
    * @throws Exception
    */
+  @Test
   public void testStopNonRegisteredProxy() throws Exception {
     RPC.stopProxy(mock(TestProtocol.class));
   }
   
+  @Test
+  public void testStopProxy() throws IOException {
+    StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
+  public void testWrappedStopProxy() throws IOException {
+    StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(wrappedProxy);
+    
+    StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
+        wrappedProxy, RetryPolicies.RETRY_FOREVER);
+    
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
   public void testErrorMsgForInsecureClient() throws Exception {
     final Server server = RPC.getServer(TestProtocol.class,
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -567,10 +681,10 @@ public class TestRPC extends TestCase {
     return count;
   }
 
-
   /**
    * Test that server.stop() properly stops all threads
    */
+  @Test
   public void testStopsAllThreads() throws Exception {
     int threadsBefore = countThreads("Server$Listener$Reader");
     assertEquals("Expect no Reader threads running before test",
@@ -591,8 +705,7 @@ public class TestRPC extends TestCase {
   }
   
   public static void main(String[] args) throws Exception {
-
-    new TestRPC("test").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
 
   }
 }



Mime
View raw message