ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject ambari git commit: AMBARI-10456 - Ambari Server Deadlock When Mapping Hosts (jonathanhurley)
Date Tue, 14 Apr 2015 18:00:10 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk ceb196b83 -> f11e2f063


AMBARI-10456 - Ambari Server Deadlock When Mapping Hosts (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f11e2f06
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f11e2f06
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f11e2f06

Branch: refs/heads/trunk
Commit: f11e2f0630f922cf7102fe4291d5c80230b4cb8c
Parents: ceb196b
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Tue Apr 14 10:59:57 2015 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Tue Apr 14 13:58:27 2015 -0400

----------------------------------------------------------------------
 .../events/publishers/AmbariEventPublisher.java |  10 +-
 .../server/state/cluster/ClustersImpl.java      | 101 ++++----------
 .../services/AmbariServerAlertService.java      |  28 ++--
 .../server/agent/TestHeartbeatHandler.java      |   4 +
 .../apache/ambari/server/events/EventsTest.java |  13 +-
 .../server/orm/dao/AlertDispatchDAOTest.java    |  19 +--
 .../ambari/server/orm/dao/AlertsDAOTest.java    |  15 +-
 .../state/alerts/AlertEventPublisherTest.java   |  25 +---
 .../alerts/AlertStateChangedEventTest.java      |  20 +--
 .../state/alerts/InitialAlertEventTest.java     |  26 +---
 .../state/cluster/AlertDataManagerTest.java     |   9 +-
 .../state/cluster/ClusterDeadlockTest.java      |   4 -
 .../state/cluster/ClustersDeadlockTest.java     | 136 +++++++++++++++++-
 .../server/utils/EventBusSynchronizer.java      | 137 +++++++++++++++++++
 14 files changed, 350 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
index 96e66a62..05194e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
@@ -17,16 +17,19 @@
  */
 package org.apache.ambari.server.events.publishers;
 
+import java.util.concurrent.Executors;
+
 import org.apache.ambari.server.events.AmbariEvent;
 
+import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Singleton;
 
 /**
  * The {@link AmbariEventPublisher} is used to publish instances of
- * {@link AmbariEvent} to any {@link Subscribe} interested. It uses a
- * single-threaded, serial {@link EventBus}.
+ * {@link AmbariEvent} to any {@link Subscribe} methods interested. It uses a
+ * single-threaded {@link AsyncEventBus}.
  */
 @Singleton
 public class AmbariEventPublisher {
@@ -40,7 +43,8 @@ public class AmbariEventPublisher {
    * Constructor.
    */
   public AmbariEventPublisher() {
-    m_eventBus = new EventBus("ambari-event-bus");
+    m_eventBus = new AsyncEventBus("ambari-event-bus",
+        Executors.newSingleThreadExecutor());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 9cf1f5a..c7a8ddb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.persistence.RollbackException;
@@ -230,29 +229,25 @@ public class ClustersImpl implements Clusters {
   public Cluster getCluster(String clusterName)
       throws AmbariException {
     checkLoaded();
-    r.lock();
-    try {
-      if (!clusters.containsKey(clusterName)) {
-        throw new ClusterNotFoundException(clusterName);
-      }
-      return clusters.get(clusterName);
-    } finally {
-      r.unlock();
+
+    Cluster cluster = clusters.get(clusterName);
+    if (null == cluster) {
+      throw new ClusterNotFoundException(clusterName);
     }
+
+    return cluster;
   }
 
   @Override
   public Cluster getClusterById(long id) throws AmbariException {
     checkLoaded();
-    r.lock();
-    try {
-      if (!clustersById.containsKey(id)) {
-        throw new ClusterNotFoundException("clusterID=" + id);
-      }
-      return clustersById.get(id);
-    } finally {
-      r.unlock();
+
+    Cluster cluster = clustersById.get(id);
+    if (null == cluster) {
+      throw new ClusterNotFoundException("clusterID=" + id);
     }
+
+    return clustersById.get(id);
   }
 
   @Override
@@ -281,15 +276,8 @@ public class ClustersImpl implements Clusters {
   @Override
   public List<Host> getHosts() {
     checkLoaded();
-    r.lock();
 
-    try {
-      List<Host> hostList = new ArrayList<Host>(hosts.size());
-      hostList.addAll(hosts.values());
-      return hostList;
-    } finally {
-      r.unlock();
-    }
+    return new ArrayList<Host>(hosts.values());
   }
 
   @Override
@@ -315,15 +303,12 @@ public class ClustersImpl implements Clusters {
   @Override
   public Host getHost(String hostname) throws AmbariException {
     checkLoaded();
-    r.lock();
-    try {
-      if (!hosts.containsKey(hostname)) {
-        throw new HostNotFoundException(hostname);
-      }
-      return hosts.get(hostname);
-    } finally {
-      r.unlock();
+
+    if (!hosts.containsKey(hostname)) {
+      throw new HostNotFoundException(hostname);
     }
+
+    return hosts.get(hostname);
   }
 
   /**
@@ -421,41 +406,19 @@ public class ClustersImpl implements Clusters {
   private Map<String, Host> getHostsMap(Collection<String> hostSet) throws
       HostNotFoundException {
     checkLoaded();
+
     Map<String, Host> hostMap = new HashMap<String, Host>();
-    r.lock();
-    try {
-      for (String host : hostSet) {
-        if (!hosts.containsKey(host)) {
-          throw new HostNotFoundException(host);
-        } else {
-          hostMap.put(host, hosts.get(host));
-        }
-      }
-    } finally {
-      r.unlock();
-    }
-    return hostMap;
-  }
 
-  private Map<String, Cluster> getClustersMap(Collection<String> clusterSet) throws
-      ClusterNotFoundException {
-    checkLoaded();
-    Map<String, Cluster> clusterMap = new HashMap<String, Cluster>();
-    r.lock();
-    try {
-      for (String c : clusterSet) {
-        if (c != null) {
-          if (!clusters.containsKey(c)) {
-            throw new ClusterNotFoundException(c);
-          } else {
-            clusterMap.put(c, clusters.get(c));
-          }
-        }
+    for (String hostName : hostSet) {
+      Host host = hosts.get(hostName);
+      if (null == hostName) {
+        throw new HostNotFoundException(hostName);
       }
-    } finally {
-      r.unlock();
+
+      hostMap.put(hostName, host);
     }
-    return clusterMap;
+
+    return hostMap;
   }
 
   /**
@@ -533,14 +496,8 @@ public class ClustersImpl implements Clusters {
       w.unlock();
     }
 
-    ReadWriteLock clusterLock = cluster.getClusterGlobalLock();
-    clusterLock.writeLock().lock();
-    try {
-      host.refresh();
-      cluster.refresh();
-    } finally {
-      clusterLock.writeLock().unlock();
-    }
+    cluster.refresh();
+    host.refresh();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
index 89f9656..6669e7b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
@@ -146,7 +146,7 @@ public class AmbariServerAlertService extends AbstractScheduledService {
   protected void runOneIteration() throws Exception {
     Map<String, Cluster> clusterMap = m_clustersProvider.get().getClusters();
     for (Cluster cluster : clusterMap.values()) {
-      // get all of the cluster alerts for the server
+      // get all of the cluster alerts for AMBARI/AMBARI_SERVER
       List<AlertDefinitionEntity> entities = m_dao.findByServiceComponent(
           cluster.getClusterId(), Services.AMBARI.name(),
           Components.AMBARI_SERVER.name());
@@ -155,17 +155,25 @@ public class AmbariServerAlertService extends AbstractScheduledService {
       for (AlertDefinitionEntity entity : entities) {
         String definitionName = entity.getDefinitionName();
         ScheduledAlert scheduledAlert = m_futureMap.get(definitionName);
-        ScheduledFuture<?> scheduledFuture = scheduledAlert.getScheduledFuture();
+
+        // disabled or new alerts may not have anything mapped yet
+        ScheduledFuture<?> scheduledFuture = null;
+        if (null != scheduledAlert) {
+          scheduledFuture = scheduledAlert.getScheduledFuture();
+        }
 
         // if the definition is not enabled, ensure it's not scheduled and
         // then continue to the next one
         if (!entity.getEnabled()) {
-          unschedule(definitionName, scheduledFuture);
+          if (null != scheduledFuture) {
+            unschedule(definitionName, scheduledFuture);
+          }
+
           continue;
         }
 
-        // if there is no future, then schedule it
-        if (null == scheduledFuture) {
+        // if the definition hasn't been scheduled, then schedule it
+        if (null == scheduledAlert || null == scheduledFuture) {
           scheduleRunnable(entity);
           continue;
         }
@@ -190,12 +198,14 @@ public class AmbariServerAlertService extends AbstractScheduledService {
    *
    * @param scheduledFuture
    */
-  private void unschedule(String definitionName,
-      ScheduledFuture<?> scheduledFuture) {
-    scheduledFuture.cancel(true);
+  private void unschedule(String definitionName, ScheduledFuture<?> scheduledFuture) {
+
     m_futureMap.remove(definitionName);
 
-    LOG.info("Unscheduled server alert {}", definitionName);
+    if (null != scheduledFuture) {
+      scheduledFuture.cancel(true);
+      LOG.info("Unscheduled server alert {}", definitionName);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index fe4ba60..9cf7a99 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -105,6 +105,7 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
@@ -2423,6 +2424,9 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testInstallPackagesWithVersion() throws Exception {
+    // required since this test method checks the DAO result of handling a
+    // heartbeat which performs some async tasks
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
 
     final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
         Role.DATANODE, null, null);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
index 09c335a..6073677 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
@@ -17,14 +17,12 @@
  */
 package org.apache.ambari.server.events;
 
-import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
 
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -42,6 +40,7 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,7 +65,6 @@ public class EventsTest {
   private ServiceFactory m_serviceFactory;
   private ServiceComponentFactory m_componentFactory;
   private ServiceComponentHostFactory m_schFactory;
-  private AmbariEventPublisher m_eventPublisher;
   private MockEventListener m_listener;
   private OrmTestHelper m_helper;
 
@@ -78,20 +76,13 @@ public class EventsTest {
     m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
     m_injector.getInstance(GuiceJpaInitializer.class);
 
-    m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
-    EventBus synchronizedBus = new EventBus();
-
     m_helper = m_injector.getInstance(OrmTestHelper.class);
 
     // register mock listener
+    EventBus synchronizedBus = EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
     m_listener = m_injector.getInstance(MockEventListener.class);
     synchronizedBus.register(m_listener);
 
-    // !!! need a synchronous op for testing
-    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(m_eventPublisher, synchronizedBus);
-
     m_clusters = m_injector.getInstance(Clusters.class);
     m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
     m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
index 8768ffc..92866d7 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -44,8 +43,6 @@ import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.SortRequest.Order;
 import org.apache.ambari.server.controller.spi.SortRequestProperty;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.AlertDaoHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -64,11 +61,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
@@ -92,8 +89,6 @@ public class AlertDispatchDAOTest {
   private ServiceComponentFactory m_componentFactory;
   private ServiceComponentHostFactory m_schFactory;
   private AlertDaoHelper m_alertHelper;
-  private AmbariEventPublisher m_eventPublisher;
-  private EventBus m_synchronizedBus;
 
   /**
    *
@@ -111,13 +106,9 @@ public class AlertDispatchDAOTest {
     m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
     m_clusters = m_injector.getInstance(Clusters.class);
     m_alertHelper = m_injector.getInstance(AlertDaoHelper.class);
-    m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
 
     // !!! need a synchronous op for testing
-    m_synchronizedBus = new EventBus();
-    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(m_eventPublisher, m_synchronizedBus);
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
 
     m_cluster = m_clusters.getClusterById(m_helper.createCluster());
     m_helper.initializeClusterWithStack(m_cluster);
@@ -844,8 +835,6 @@ public class AlertDispatchDAOTest {
    */
   @Test
   public void testFindDefaultGroup() throws Exception {
-    m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
     List<AlertGroupEntity> groups = m_dao.findAllGroups();
     assertNotNull(groups);
     assertEquals(10, groups.size());
@@ -870,8 +859,6 @@ public class AlertDispatchDAOTest {
    */
   @Test
   public void testDefaultGroupAutomaticCreation() throws Exception {
-    m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
     List<AlertGroupEntity> groups = m_dao.findAllGroups();
     assertNotNull(groups);
     assertEquals(10, groups.size());
@@ -916,8 +903,6 @@ public class AlertDispatchDAOTest {
    */
   @Test(expected = AmbariException.class)
   public void testDefaultGroupInvalidServiceNoCreation() throws Exception {
-    m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
     List<AlertGroupEntity> groups = m_dao.findAllGroups();
     assertNotNull(groups);
     assertEquals(10, groups.size());

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index 9c8ea7d..e6a95ae 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
@@ -45,8 +44,6 @@ import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.SortRequest.Order;
 import org.apache.ambari.server.controller.spi.SortRequestProperty;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.AlertDaoHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -66,11 +63,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
@@ -93,8 +90,6 @@ public class AlertsDAOTest {
   private ServiceFactory m_serviceFactory;
   private ServiceComponentFactory m_componentFactory;
   private ServiceComponentHostFactory m_schFactory;
-  private AmbariEventPublisher m_eventPublisher;
-  private EventBus m_synchronizedBus;
 
   private AlertDaoHelper m_alertHelper;
 
@@ -111,15 +106,11 @@ public class AlertsDAOTest {
     m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
     m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);
     m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
-    m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
     m_clusters = m_injector.getInstance(Clusters.class);
     m_alertHelper = m_injector.getInstance(AlertDaoHelper.class);
 
     // !!! need a synchronous op for testing
-    m_synchronizedBus = new EventBus();
-    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(m_eventPublisher, m_synchronizedBus);
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
 
     // install YARN so there is at least 1 service installed and no
     // unexpected alerts since the test YARN service doesn't have any alerts
@@ -908,8 +899,6 @@ public class AlertsDAOTest {
    */
   @Test
   public void testMaintenanceMode() throws Exception {
-    m_synchronizedBus.register(m_injector.getInstance(AlertMaintenanceModeListener.class));
-
     m_helper.installHdfsService(m_cluster, m_serviceFactory,
         m_componentFactory, m_schFactory, HOSTNAME);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
index 19b5d46..1c4567f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
@@ -17,19 +17,13 @@
  */
 package org.apache.ambari.server.state.alerts;
 
-import java.lang.reflect.Field;
 import java.util.UUID;
 
 import junit.framework.Assert;
 
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.events.AlertDefinitionChangedEvent;
 import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AmbariEvent;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -53,11 +47,11 @@ import org.apache.ambari.server.state.alert.Reporting;
 import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
 import com.google.gson.Gson;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -77,10 +71,8 @@ public class AlertEventPublisherTest {
   private String clusterName;
   private Injector injector;
   private ServiceFactory serviceFactory;
-  private AmbariMetaInfo metaInfo;
   private OrmTestHelper ormHelper;
   private AggregateDefinitionMapping aggregateMapping;
-  private AmbariEventPublisher eventPublisher;
 
   /**
    *
@@ -90,18 +82,7 @@ public class AlertEventPublisherTest {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
 
-    eventPublisher = injector.getInstance(AmbariEventPublisher.class);
-    EventBus synchronizedBus = new EventBus();
-
-    // force singleton init via Guice so the listener registers with the bus
-    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
-    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
-    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
-
-    // !!! need a synchronous op for testing
-    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(eventPublisher, synchronizedBus);
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
 
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     definitionDao = injector.getInstance(AlertDefinitionDAO.class);
@@ -111,8 +92,6 @@ public class AlertEventPublisherTest {
     ormHelper = injector.getInstance(OrmTestHelper.class);
     aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
 
-    metaInfo = injector.getInstance(AmbariMetaInfo.class);
-
     clusterName = "foo";
     clusters.addCluster(clusterName);
     cluster = clusters.getCluster(clusterName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
index b64afed..7144625 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.state.alerts;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -25,8 +24,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.ambari.server.events.AlertStateChangeEvent;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -38,12 +35,12 @@ import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -71,20 +68,13 @@ public class AlertStateChangedEventTest {
 
     injector.getInstance(GuiceJpaInitializer.class);
 
-    // force singleton init via Guice so the listener registers with the bus
-    injector.getInstance(AlertServiceStateListener.class);
-    injector.getInstance(AlertStateChangedListener.class);
-
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
-    eventPublisher = injector.getInstance(AlertEventPublisher.class);
-
-    EventBus synchronizedBus = new EventBus();
-    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
 
     // !!! need a synchronous op for testing
-    Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(eventPublisher, synchronizedBus);
+    EventBusSynchronizer.synchronizeAlertEventPublisher(injector);
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+
+    eventPublisher = injector.getInstance(AlertEventPublisher.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
index 4e55c49..73bf6c4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
@@ -17,17 +17,11 @@
  */
 package org.apache.ambari.server.state.alerts;
 
-import java.lang.reflect.Field;
-
 import junit.framework.Assert;
 
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.InitialAlertEvent;
 import org.apache.ambari.server.events.MockEventListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -41,6 +35,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,7 +63,6 @@ public class InitialAlertEventTest {
   private Cluster m_cluster;
   private String m_clusterName;
   private ServiceFactory m_serviceFactory;
-  private AmbariMetaInfo m_metaInfo;
 
   /**
    *
@@ -83,28 +77,18 @@ public class InitialAlertEventTest {
     // get a mock listener
     m_listener = m_injector.getInstance(MockEventListener.class);
 
-    m_alertsDao = m_injector.getInstance(AlertsDAO.class);
-
     // create the publisher and mock listener
     m_eventPublisher = m_injector.getInstance(AlertEventPublisher.class);
-    EventBus synchronizedBus = new EventBus();
 
     // register listeners needed
+    EventBus synchronizedBus = EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
     synchronizedBus.register(m_listener);
-    synchronizedBus.register(m_injector.getInstance(AlertLifecycleListener.class));
-    synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-    synchronizedBus.register(m_injector.getInstance(AlertReceivedListener.class));
-
-    // !!! need a synchronous op for testing
-    Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(m_eventPublisher, synchronizedBus);
 
     m_definitionDao = m_injector.getInstance(AlertDefinitionDAO.class);
     m_clusters = m_injector.getInstance(Clusters.class);
     m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
 
-    m_metaInfo = m_injector.getInstance(AmbariMetaInfo.class);
+    m_alertsDao = m_injector.getInstance(AlertsDAO.class);
 
     m_clusterName = "c1";
     m_clusters.addCluster(m_clusterName);
@@ -190,12 +174,14 @@ public class InitialAlertEventTest {
   /**
    *
    */
-  private class MockModule implements Module {
+  private static class MockModule implements Module {
     /**
      * {@inheritDoc}
      */
     @Override
     public void configure(Binder binder) {
+      // sychronize on the ambari event bus for this test to work properly
+      EventBusSynchronizer.synchronizeAmbariEventPublisher(binder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
index acf7911..c289bcc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
@@ -65,11 +65,11 @@ import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.Source;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.gson.Gson;
 import com.google.inject.Guice;
@@ -396,12 +396,9 @@ public class AlertDataManagerTest {
       m_dao.merge(current);
     }
 
-    AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class);
-
     // !!! need a synchronous op for testing
-    field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
-    field.setAccessible(true);
-    field.set(publisher, new EventBus());
+    AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class);
+    EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
 
     final AtomicReference<Alert> ref = new AtomicReference<Alert>();
     publisher.register(new TestListener() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
index 766105d..ff039a9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.ServiceNotFoundException;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -80,9 +79,6 @@ public class ClusterDeadlockTest {
   private ServiceComponentHostFactory serviceComponentHostFactory;
 
   @Inject
-  private AmbariMetaInfo metaInfo;
-
-  @Inject
   private OrmTestHelper helper;
 
   private StackId stackId = new StackId("HDP-0.1");

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
index 839b25f..d771eba 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -34,7 +36,14 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,6 +64,8 @@ public class ClustersDeadlockTest {
 
   private final AtomicInteger hostNameCounter = new AtomicInteger(0);
 
+  private final StackId stackId = new StackId("HDP-0.1");
+
   @Inject
   private Injector injector;
 
@@ -62,7 +73,13 @@ public class ClustersDeadlockTest {
   private Clusters clusters;
 
   @Inject
-  private AmbariMetaInfo metaInfo;
+  private ServiceFactory serviceFactory;
+
+  @Inject
+  private ServiceComponentFactory serviceComponentFactory;
+
+  @Inject
+  private ServiceComponentHostFactory serviceComponentHostFactory;
 
   @Inject
   private OrmTestHelper helper;
@@ -81,6 +98,9 @@ public class ClustersDeadlockTest {
     cluster.setDesiredStackVersion(stackId);
     helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion());
     cluster.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+
+    // install HDFS
+    installService("HDFS");
   }
 
   @After
@@ -89,7 +109,7 @@ public class ClustersDeadlockTest {
   }
 
   /**
-   * Tests that no deadlock exists when adding hosts from reading from the
+   * Tests that no deadlock exists when adding hosts while reading from the
    * cluster.
    *
    * @throws Exception
@@ -117,7 +137,34 @@ public class ClustersDeadlockTest {
   }
 
   /**
-   * Tests that no deadlock exists when adding hosts from reading from the
+   * Tests that no deadlock exists when adding hosts while reading from the
+   * cluster. This test ensures that there are service components installed on
+   * the hosts so that the cluster health report does some more work.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 35000)
+  public void testDeadlockWhileMappingHostsWithExistingServices()
+      throws Exception {
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+      ClusterReaderThread readerThread = new ClusterReaderThread();
+      ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread();
+
+      threads.add(readerThread);
+      threads.add(writerThread);
+
+      readerThread.start();
+      writerThread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * Tests that no deadlock exists when adding hosts while reading from the
    * cluster.
    *
    * @throws Exception
@@ -194,6 +241,38 @@ public class ClustersDeadlockTest {
   }
 
   /**
+   * The {@link ClustersHostAndComponentMapperThread} is used to map hosts to a
+   * cluster over and over. This will also add components to the hosts that are
+   * being mapped to further exercise the cluster health report concurrency.
+   */
+  private final class ClustersHostAndComponentMapperThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
+          String hostName = "c64-" + hostNameCounter.getAndIncrement();
+          clusters.addHost(hostName);
+          setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
+          clusters.getHost(hostName).persist();
+          clusters.mapHostToCluster(hostName, CLUSTER_NAME);
+
+          // create DATANODE on this host so that we end up exercising the
+          // cluster health report since we need a service component host
+          createNewServiceComponentHost("HDFS", "DATANODE", hostName);
+
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
    * The {@link ClustersHostUnMapperThread} is used to unmap hosts to a cluster
    * over and over.
    */
@@ -235,4 +314,53 @@ public class ClustersDeadlockTest {
     hostAttributes.put("os_release_version", osVersion);
     host.setHostAttributes(hostAttributes);
   }
+
+  private Service installService(String serviceName) throws AmbariException {
+    Service service = null;
+
+    try {
+      service = cluster.getService(serviceName);
+    } catch (ServiceNotFoundException e) {
+      service = serviceFactory.createNew(cluster, serviceName);
+      cluster.addService(service);
+      service.persist();
+    }
+
+    return service;
+  }
+
+  private ServiceComponent addServiceComponent(Service service,
+      String componentName) throws AmbariException {
+    ServiceComponent serviceComponent = null;
+    try {
+      serviceComponent = service.getServiceComponent(componentName);
+    } catch (ServiceComponentNotFoundException e) {
+      serviceComponent = serviceComponentFactory.createNew(service,
+          componentName);
+      service.addServiceComponent(serviceComponent);
+      serviceComponent.setDesiredState(State.INSTALLED);
+      serviceComponent.persist();
+    }
+
+    return serviceComponent;
+  }
+
+  private ServiceComponentHost createNewServiceComponentHost(String svc,
+      String svcComponent, String hostName) throws AmbariException {
+    Assert.assertNotNull(cluster.getConfigGroups());
+    Service s = installService(svc);
+    ServiceComponent sc = addServiceComponent(s, svcComponent);
+
+    ServiceComponentHost sch = serviceComponentHostFactory.createNew(sc,
+        hostName);
+
+    sc.addServiceComponentHost(sch);
+    sch.setDesiredState(State.INSTALLED);
+    sch.setState(State.INSTALLED);
+    sch.setDesiredStackVersion(stackId);
+    sch.setStackVersion(stackId);
+
+    sch.persist();
+    return sch;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
new file mode 100644
index 0000000..4b0c031
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.lang.reflect.Field;
+
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+
+/**
+ * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
+ * used by Guava with a synchronous, serial {@link EventBus} instance. This
+ * enables testing that relies on testing the outcome of asynchronous events by
+ * executing the events on the current thread serially.
+ */
+public class EventBusSynchronizer {
+
+  /**
+   * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
+   * and synchronous.
+   *
+   * @param binder
+   */
+  public static void synchronizeAmbariEventPublisher(Binder binder) {
+    EventBus synchronizedBus = new EventBus();
+    AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
+
+    replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
+        synchronizedBus);
+
+    binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
+  }
+
+  /**
+   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
+    EventBus synchronizedBus = new EventBus();
+    AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
+
+    replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
+
+    // register common ambari event listeners
+    registerAmbariListeners(injector, synchronizedBus);
+
+    return synchronizedBus;
+  }
+
+  /**
+   * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static EventBus synchronizeAlertEventPublisher(Injector injector) {
+    EventBus synchronizedBus = new EventBus();
+    AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
+
+    replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
+
+    // register common alert event listeners
+    registerAlertListeners(injector, synchronizedBus);
+
+    return synchronizedBus;
+  }
+
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerAmbariListeners(Injector injector,
+      EventBus synchronizedBus) {
+    synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
+    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+    synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
+  }
+
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerAlertListeners(Injector injector,
+      EventBus synchronizedBus) {
+    synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
+    synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+  }
+
+  private static void replaceEventBus(Class<?> eventPublisherClass,
+      Object instance, EventBus eventBus) {
+
+    try {
+      Field field = eventPublisherClass.getDeclaredField("m_eventBus");
+      field.setAccessible(true);
+      field.set(instance, eventBus);
+    } catch (Exception exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+}


Mime
View raw message