hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1432246 [3/3] - in /hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/bin/ src/main/docs/ src/main/docs/src/documentation/content/xdocs/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/ma...
Date Fri, 11 Jan 2013 19:40:28 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Fri Jan 11 19:40:23 2013
@@ -38,6 +38,10 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
@@ -823,6 +827,96 @@ public class TestRPC {
     }
   }
   
+  @Test(timeout=90000)
+  public void testRPCInterruptedSimple() throws Exception {
+    final Configuration conf = new Configuration();
+    Server server = RPC.getServer(
+      TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+    );
+    server.start();
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+    final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+        TestProtocol.class, TestProtocol.versionID, addr, conf);
+    // Connect to the server
+    proxy.ping();
+    // Interrupt self, try another call
+    Thread.currentThread().interrupt();
+    try {
+      proxy.ping();
+      fail("Interruption did not cause IPC to fail");
+    } catch (IOException ioe) {
+      if (!ioe.toString().contains("InterruptedException")) {
+        throw ioe;
+      }
+      // clear interrupt status for future tests
+      Thread.interrupted();
+    }
+  }
+  
+  @Test(timeout=30000)
+  public void testRPCInterrupted() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    Server server = RPC.getServer(
+      TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+    );
+
+    server.start();
+
+    int numConcurrentRPC = 200;
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
+    final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
+    final AtomicBoolean leaderRunning = new AtomicBoolean(true);
+    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+    Thread leaderThread = null;
+    
+    for (int i = 0; i < numConcurrentRPC; i++) {
+      final int num = i;
+      final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+      TestProtocol.class, TestProtocol.versionID, addr, conf);
+      Thread rpcThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            barrier.await();
+            while (num == 0 || leaderRunning.get()) {
+              proxy.slowPing(false);
+            }
+
+            proxy.slowPing(false);
+          } catch (Exception e) {
+            if (num == 0) {
+              leaderRunning.set(false);
+            } else {
+              error.set(e);
+            }
+
+            LOG.error(e);
+          } finally {
+            latch.countDown();
+          }
+        }
+      });
+      rpcThread.start();
+
+      if (leaderThread == null) {
+       leaderThread = rpcThread;
+      }
+    }
+    // let threads get past the barrier
+    Thread.sleep(1000);
+    // stop a single thread
+    while (leaderRunning.get()) {
+      leaderThread.interrupt();
+    }
+    
+    latch.await();
+    
+    // should not cause any other thread to get an error
+    assertTrue("rpc got exception " + error.get(), error.get() == null);
+  }
+
   public static void main(String[] args) throws Exception {
     new TestRPC().testCallsInternal(conf);
 



Mime
View raw message