ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject [1/2] ambari git commit: AMBARI-18513. Optimize ClustersDeadlockTest (aonishuk)
Date Wed, 05 Oct 2016 11:47:09 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 e8ec4d0b0 -> b9b39d219
  refs/heads/trunk 62dc775ee -> a901d8a7a


AMBARI-18513. Optimize ClustersDeadlockTest (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a901d8a7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a901d8a7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a901d8a7

Branch: refs/heads/trunk
Commit: a901d8a7a8f21cf9e6055a6669883abe47d5e370
Parents: 62dc775
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Wed Oct 5 14:46:46 2016 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Wed Oct 5 14:46:46 2016 +0300

----------------------------------------------------------------------
 .../state/cluster/ClustersDeadlockTest.java     | 137 +++++++++++++------
 1 file changed, 97 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a901d8a7/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
index a0a6444..190f64d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
@@ -22,8 +22,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.inject.Provider;
 import junit.framework.Assert;
 
 import org.apache.ambari.server.AmbariException;
@@ -69,6 +71,9 @@ public class ClustersDeadlockTest {
 
   private final AtomicInteger hostNameCounter = new AtomicInteger(0);
 
+  private CountDownLatch writerStoppedSignal;
+  private CountDownLatch readerStoppedSignal;
+
   private final StackId stackId = new StackId("HDP-0.1");
 
   @Inject
@@ -109,6 +114,9 @@ public class ClustersDeadlockTest {
 
     // install HDFS
     installService("HDFS");
+
+    writerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
+    readerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
   }
 
   @After
@@ -117,6 +125,42 @@ public class ClustersDeadlockTest {
   }
 
   /**
+   * Launches reader and writer threads simultaneously to check for a deadlock.
+   * The numbers of launched reader and writer threads are equal to
+   * the {@code}numberOfThreads{@code}. This method expects that reader
+   * and writer threads are using {@code}readerStoppedSignal{@code}
+   * and {@code}writerStoppedSignal{@code} correctly.
+   *
+   * Reader threads should be stopped after writer threads are finished.
+   */
+  private void doLoadTest(Provider<? extends Thread> readerProvider,
+                          Provider<? extends Thread> writerProvider,
+                          final int numberOfThreads,
+                          CountDownLatch writerStoppedSignal,
+                          CountDownLatch readerStoppedSignal) throws Exception {
+    List<Thread> writerThreads = new ArrayList<Thread>();
+    for (int i = 0; i < numberOfThreads; i++) {
+      Thread readerThread = readerProvider.get();
+      Thread writerThread = writerProvider.get();
+
+      writerThreads.add(writerThread);
+
+      readerThread.start();
+      writerThread.start();
+    }
+
+    for (Thread writerThread : writerThreads) {
+      writerThread.join();
+      // Notify that one writer thread is stopped
+      writerStoppedSignal.countDown();
+    }
+
+    // All writer threads are stopped. Reader threads should finish now.
+    // Await for all reader threads to stop
+    readerStoppedSignal.await();
+  }
+
+  /**
    * Tests that no deadlock exists when adding hosts while reading from the
    * cluster.
    *
@@ -124,21 +168,20 @@ public class ClustersDeadlockTest {
    */
   @Test(timeout = 40000)
   public void testDeadlockWhileMappingHosts() throws Exception {
-    List<Thread> threads = new ArrayList<Thread>();
-    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
-      ClusterReaderThread readerThread = new ClusterReaderThread();
-      ClustersHostMapperThread writerThread = new ClustersHostMapperThread();
-
-      threads.add(readerThread);
-      threads.add(writerThread);
+    Provider<ClustersHostMapperThread> clustersHostMapperThreadFactory =
+        new Provider<ClustersHostMapperThread>() {
 
-      readerThread.start();
-      writerThread.start();
-    }
+      @Override
+      public ClustersHostMapperThread get() {
+        return new ClustersHostMapperThread();
+      }
+    };
 
-    for (Thread thread : threads) {
-      thread.join();
-    }
+    doLoadTest(new ClusterReaderThreadFactory(),
+               clustersHostMapperThreadFactory,
+               NUMBER_OF_THREADS,
+               writerStoppedSignal,
+               readerStoppedSignal);
 
     Assert.assertEquals(NUMBER_OF_THREADS * NUMBER_OF_HOSTS,
         clusters.getHostsForCluster(CLUSTER_NAME).size());
@@ -154,21 +197,20 @@ public class ClustersDeadlockTest {
   @Test(timeout = 40000)
   public void testDeadlockWhileMappingHostsWithExistingServices()
       throws Exception {
-    List<Thread> threads = new ArrayList<Thread>();
-    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
-      ClusterReaderThread readerThread = new ClusterReaderThread();
-      ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread();
-
-      threads.add(readerThread);
-      threads.add(writerThread);
+    Provider<ClustersHostAndComponentMapperThread> clustersHostAndComponentMapperThreadFactory
=
+        new Provider<ClustersHostAndComponentMapperThread>() {
 
-      readerThread.start();
-      writerThread.start();
-    }
+      @Override
+      public ClustersHostAndComponentMapperThread get() {
+        return new ClustersHostAndComponentMapperThread();
+      }
+    };
 
-    for (Thread thread : threads) {
-      thread.join();
-    }
+    doLoadTest(new ClusterReaderThreadFactory(),
+        clustersHostAndComponentMapperThreadFactory,
+        NUMBER_OF_THREADS,
+        writerStoppedSignal,
+        readerStoppedSignal);
   }
 
   /**
@@ -179,26 +221,33 @@ public class ClustersDeadlockTest {
    */
   @Test(timeout = 40000)
   public void testDeadlockWhileUnmappingHosts() throws Exception {
-    List<Thread> threads = new ArrayList<Thread>();
-    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
-      ClusterReaderThread readerThread = new ClusterReaderThread();
-      ClustersHostUnMapperThread writerThread = new ClustersHostUnMapperThread();
-
-      threads.add(readerThread);
-      threads.add(writerThread);
+    Provider<ClustersHostUnMapperThread> clustersHostUnMapperThreadFactory =
+        new Provider<ClustersHostUnMapperThread>() {
 
-      readerThread.start();
-      writerThread.start();
-    }
+      @Override
+      public ClustersHostUnMapperThread get() {
+        return new ClustersHostUnMapperThread();
+      }
+    };
 
-    for (Thread thread : threads) {
-      thread.join();
-    }
+    doLoadTest(new ClusterReaderThreadFactory(),
+        clustersHostUnMapperThreadFactory,
+        NUMBER_OF_THREADS,
+        writerStoppedSignal,
+        readerStoppedSignal);
 
     Assert.assertEquals(0,
         clusters.getHostsForCluster(CLUSTER_NAME).size());
   }
 
+  private final class ClusterReaderThreadFactory implements Provider<ClusterReaderThread>
 {
+
+    @Override
+    public ClusterReaderThread get() {
+      return new ClusterReaderThread();
+    }
+  }
+
   /**
    * The {@link ClusterReaderThread} reads from a cluster over and over again
    * with a slight pause.
@@ -211,12 +260,20 @@ public class ClustersDeadlockTest {
     @Override
     public void run() {
       try {
-        for (int i = 0; i < 1000; i++) {
+        // Repeat until writer threads exist
+        while (true) {
+          if (writerStoppedSignal.getCount() == 0) {
+            break;
+          }
+
           cluster.convertToResponse();
           Thread.sleep(10);
         }
       } catch (Exception exception) {
         throw new RuntimeException(exception);
+      } finally {
+        // Notify that one reader was stopped
+        readerStoppedSignal.countDown();
       }
     }
   }


Mime
View raw message