Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EC923178B2 for ; Tue, 14 Apr 2015 18:00:16 +0000 (UTC) Received: (qmail 49941 invoked by uid 500); 14 Apr 2015 18:00:10 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 49917 invoked by uid 500); 14 Apr 2015 18:00:10 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 49907 invoked by uid 99); 14 Apr 2015 18:00:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Apr 2015 18:00:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 39127E0416; Tue, 14 Apr 2015 18:00:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-10456 - Ambari Server Deadlock When Mapping Hosts (jonathanhurley) Date: Tue, 14 Apr 2015 18:00:10 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk ceb196b83 -> f11e2f063 AMBARI-10456 - Ambari Server Deadlock When Mapping Hosts (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f11e2f06 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f11e2f06 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f11e2f06 Branch: refs/heads/trunk Commit: f11e2f0630f922cf7102fe4291d5c80230b4cb8c Parents: ceb196b Author: Jonathan Hurley Authored: Tue Apr 14 10:59:57 2015 -0400 Committer: Jonathan Hurley Committed: Tue Apr 14 13:58:27 2015 -0400 ---------------------------------------------------------------------- .../events/publishers/AmbariEventPublisher.java | 10 +- .../server/state/cluster/ClustersImpl.java | 101 ++++---------- .../services/AmbariServerAlertService.java | 28 ++-- .../server/agent/TestHeartbeatHandler.java | 4 + .../apache/ambari/server/events/EventsTest.java | 13 +- .../server/orm/dao/AlertDispatchDAOTest.java | 19 +-- .../ambari/server/orm/dao/AlertsDAOTest.java | 15 +- .../state/alerts/AlertEventPublisherTest.java | 25 +--- .../alerts/AlertStateChangedEventTest.java | 20 +-- .../state/alerts/InitialAlertEventTest.java | 26 +--- .../state/cluster/AlertDataManagerTest.java | 9 +- .../state/cluster/ClusterDeadlockTest.java | 4 - .../state/cluster/ClustersDeadlockTest.java | 136 +++++++++++++++++- .../server/utils/EventBusSynchronizer.java | 137 +++++++++++++++++++ 14 files changed, 350 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java index 96e66a62..05194e4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java @@ -17,16 +17,19 @@ */ package org.apache.ambari.server.events.publishers; +import java.util.concurrent.Executors; + import org.apache.ambari.server.events.AmbariEvent; +import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import com.google.inject.Singleton; /** * The {@link AmbariEventPublisher} is used to publish instances of - * {@link AmbariEvent} to any {@link Subscribe} interested. It uses a - * single-threaded, serial {@link EventBus}. + * {@link AmbariEvent} to any {@link Subscribe} methods interested. It uses a + * single-threaded {@link AsyncEventBus}. */ @Singleton public class AmbariEventPublisher { @@ -40,7 +43,8 @@ public class AmbariEventPublisher { * Constructor. */ public AmbariEventPublisher() { - m_eventBus = new EventBus("ambari-event-bus"); + m_eventBus = new AsyncEventBus("ambari-event-bus", + Executors.newSingleThreadExecutor()); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java index 9cf1f5a..c7a8ddb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.persistence.RollbackException; @@ -230,29 +229,25 @@ public class ClustersImpl implements Clusters { public Cluster getCluster(String clusterName) throws AmbariException { checkLoaded(); - r.lock(); - try { - if (!clusters.containsKey(clusterName)) { - throw new ClusterNotFoundException(clusterName); - } - return clusters.get(clusterName); - } finally { - r.unlock(); + + Cluster cluster = clusters.get(clusterName); + if (null == cluster) { + throw new ClusterNotFoundException(clusterName); } + + return cluster; } @Override public Cluster getClusterById(long id) throws AmbariException { checkLoaded(); - r.lock(); - try { - if (!clustersById.containsKey(id)) { - throw new ClusterNotFoundException("clusterID=" + id); - } - return clustersById.get(id); - } finally { - r.unlock(); + + Cluster cluster = clustersById.get(id); + if (null == cluster) { + throw new ClusterNotFoundException("clusterID=" + id); } + + return clustersById.get(id); } @Override @@ -281,15 +276,8 @@ public class ClustersImpl implements Clusters { @Override public List getHosts() { checkLoaded(); - r.lock(); - try { - List hostList = new ArrayList(hosts.size()); - hostList.addAll(hosts.values()); - return hostList; - } finally { - r.unlock(); - } + return new ArrayList(hosts.values()); } @Override @@ -315,15 +303,12 @@ public class ClustersImpl implements Clusters { @Override public Host getHost(String hostname) throws AmbariException { checkLoaded(); - r.lock(); - try { - if (!hosts.containsKey(hostname)) { - throw new HostNotFoundException(hostname); - } - return hosts.get(hostname); - } finally { - r.unlock(); + + if (!hosts.containsKey(hostname)) { + throw new HostNotFoundException(hostname); } + + return hosts.get(hostname); } /** @@ -421,41 +406,19 @@ public class ClustersImpl implements Clusters { private Map getHostsMap(Collection hostSet) throws HostNotFoundException { checkLoaded(); + Map hostMap = new HashMap(); - r.lock(); - try { - for (String host : hostSet) { - if (!hosts.containsKey(host)) { - throw new HostNotFoundException(host); - } else { - hostMap.put(host, hosts.get(host)); - } - } - } finally { - r.unlock(); - } - return hostMap; - } - private Map getClustersMap(Collection clusterSet) throws - ClusterNotFoundException { - checkLoaded(); - Map clusterMap = new HashMap(); - r.lock(); - try { - for (String c : clusterSet) { - if (c != null) { - if (!clusters.containsKey(c)) { - throw new ClusterNotFoundException(c); - } else { - clusterMap.put(c, clusters.get(c)); - } - } + for (String hostName : hostSet) { + Host host = hosts.get(hostName); + if (null == hostName) { + throw new HostNotFoundException(hostName); } - } finally { - r.unlock(); + + hostMap.put(hostName, host); } - return clusterMap; + + return hostMap; } /** @@ -533,14 +496,8 @@ public class ClustersImpl implements Clusters { w.unlock(); } - ReadWriteLock clusterLock = cluster.getClusterGlobalLock(); - clusterLock.writeLock().lock(); - try { - host.refresh(); - cluster.refresh(); - } finally { - clusterLock.writeLock().unlock(); - } + cluster.refresh(); + host.refresh(); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java index 89f9656..6669e7b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java @@ -146,7 +146,7 @@ public class AmbariServerAlertService extends AbstractScheduledService { protected void runOneIteration() throws Exception { Map clusterMap = m_clustersProvider.get().getClusters(); for (Cluster cluster : clusterMap.values()) { - // get all of the cluster alerts for the server + // get all of the cluster alerts for AMBARI/AMBARI_SERVER List entities = m_dao.findByServiceComponent( cluster.getClusterId(), Services.AMBARI.name(), Components.AMBARI_SERVER.name()); @@ -155,17 +155,25 @@ public class AmbariServerAlertService extends AbstractScheduledService { for (AlertDefinitionEntity entity : entities) { String definitionName = entity.getDefinitionName(); ScheduledAlert scheduledAlert = m_futureMap.get(definitionName); - ScheduledFuture scheduledFuture = scheduledAlert.getScheduledFuture(); + + // disabled or new alerts may not have anything mapped yet + ScheduledFuture scheduledFuture = null; + if (null != scheduledAlert) { + scheduledFuture = scheduledAlert.getScheduledFuture(); + } // if the definition is not enabled, ensure it's not scheduled and // then continue to the next one if (!entity.getEnabled()) { - unschedule(definitionName, scheduledFuture); + if (null != scheduledFuture) { + unschedule(definitionName, scheduledFuture); + } + continue; } - // if there is no future, then schedule it - if (null == scheduledFuture) { + // if the definition hasn't been scheduled, then schedule it + if (null == scheduledAlert || null == scheduledFuture) { scheduleRunnable(entity); continue; } @@ -190,12 +198,14 @@ public class AmbariServerAlertService extends AbstractScheduledService { * * @param scheduledFuture */ - private void unschedule(String definitionName, - ScheduledFuture scheduledFuture) { - scheduledFuture.cancel(true); + private void unschedule(String definitionName, ScheduledFuture scheduledFuture) { + m_futureMap.remove(definitionName); - LOG.info("Unscheduled server alert {}", definitionName); + if (null != scheduledFuture) { + scheduledFuture.cancel(true); + LOG.info("Unscheduled server alert {}", definitionName); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java index fe4ba60..9cf7a99 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java @@ -105,6 +105,7 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.apache.ambari.server.utils.StageUtils; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.digest.DigestUtils; @@ -2423,6 +2424,9 @@ public class TestHeartbeatHandler { @Test public void testInstallPackagesWithVersion() throws Exception { + // required since this test method checks the DAO result of handling a + // heartbeat which performs some async tasks + EventBusSynchronizer.synchronizeAmbariEventPublisher(injector); final HostRoleCommand command = new HostRoleCommand(DummyHostname1, Role.DATANODE, null, null); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java index 09c335a..6073677 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java @@ -17,14 +17,12 @@ */ package org.apache.ambari.server.events; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; import java.util.Map; import junit.framework.Assert; -import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; @@ -42,6 +40,7 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.State; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -66,7 +65,6 @@ public class EventsTest { private ServiceFactory m_serviceFactory; private ServiceComponentFactory m_componentFactory; private ServiceComponentHostFactory m_schFactory; - private AmbariEventPublisher m_eventPublisher; private MockEventListener m_listener; private OrmTestHelper m_helper; @@ -78,20 +76,13 @@ public class EventsTest { m_injector = Guice.createInjector(new InMemoryDefaultTestModule()); m_injector.getInstance(GuiceJpaInitializer.class); - m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class); - EventBus synchronizedBus = new EventBus(); - m_helper = m_injector.getInstance(OrmTestHelper.class); // register mock listener + EventBus synchronizedBus = EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector); m_listener = m_injector.getInstance(MockEventListener.class); synchronizedBus.register(m_listener); - // !!! need a synchronous op for testing - Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(m_eventPublisher, synchronizedBus); - m_clusters = m_injector.getInstance(Clusters.class); m_serviceFactory = m_injector.getInstance(ServiceFactory.class); m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java index 8768ffc..92866d7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -44,8 +43,6 @@ import org.apache.ambari.server.controller.spi.SortRequest; import org.apache.ambari.server.controller.spi.SortRequest.Order; import org.apache.ambari.server.controller.spi.SortRequestProperty; import org.apache.ambari.server.controller.utilities.PredicateBuilder; -import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; -import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.AlertDaoHelper; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -64,11 +61,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.alert.SourceType; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.persist.PersistService; @@ -92,8 +89,6 @@ public class AlertDispatchDAOTest { private ServiceComponentFactory m_componentFactory; private ServiceComponentHostFactory m_schFactory; private AlertDaoHelper m_alertHelper; - private AmbariEventPublisher m_eventPublisher; - private EventBus m_synchronizedBus; /** * @@ -111,13 +106,9 @@ public class AlertDispatchDAOTest { m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class); m_clusters = m_injector.getInstance(Clusters.class); m_alertHelper = m_injector.getInstance(AlertDaoHelper.class); - m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class); // !!! need a synchronous op for testing - m_synchronizedBus = new EventBus(); - Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(m_eventPublisher, m_synchronizedBus); + EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector); m_cluster = m_clusters.getClusterById(m_helper.createCluster()); m_helper.initializeClusterWithStack(m_cluster); @@ -844,8 +835,6 @@ public class AlertDispatchDAOTest { */ @Test public void testFindDefaultGroup() throws Exception { - m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class)); - List groups = m_dao.findAllGroups(); assertNotNull(groups); assertEquals(10, groups.size()); @@ -870,8 +859,6 @@ public class AlertDispatchDAOTest { */ @Test public void testDefaultGroupAutomaticCreation() throws Exception { - m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class)); - List groups = m_dao.findAllGroups(); assertNotNull(groups); assertEquals(10, groups.size()); @@ -916,8 +903,6 @@ public class AlertDispatchDAOTest { */ @Test(expected = AmbariException.class) public void testDefaultGroupInvalidServiceNoCreation() throws Exception { - m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class)); - List groups = m_dao.findAllGroups(); assertNotNull(groups); assertEquals(10, groups.size()); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java index 9c8ea7d..e6a95ae 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; @@ -45,8 +44,6 @@ import org.apache.ambari.server.controller.spi.SortRequest; import org.apache.ambari.server.controller.spi.SortRequest.Order; import org.apache.ambari.server.controller.spi.SortRequestProperty; import org.apache.ambari.server.controller.utilities.PredicateBuilder; -import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener; -import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.AlertDaoHelper; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -66,11 +63,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.alert.SourceType; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.persist.PersistService; @@ -93,8 +90,6 @@ public class AlertsDAOTest { private ServiceFactory m_serviceFactory; private ServiceComponentFactory m_componentFactory; private ServiceComponentHostFactory m_schFactory; - private AmbariEventPublisher m_eventPublisher; - private EventBus m_synchronizedBus; private AlertDaoHelper m_alertHelper; @@ -111,15 +106,11 @@ public class AlertsDAOTest { m_serviceFactory = m_injector.getInstance(ServiceFactory.class); m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class); m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class); - m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class); m_clusters = m_injector.getInstance(Clusters.class); m_alertHelper = m_injector.getInstance(AlertDaoHelper.class); // !!! need a synchronous op for testing - m_synchronizedBus = new EventBus(); - Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(m_eventPublisher, m_synchronizedBus); + EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector); // install YARN so there is at least 1 service installed and no // unexpected alerts since the test YARN service doesn't have any alerts @@ -908,8 +899,6 @@ public class AlertsDAOTest { */ @Test public void testMaintenanceMode() throws Exception { - m_synchronizedBus.register(m_injector.getInstance(AlertMaintenanceModeListener.class)); - m_helper.installHdfsService(m_cluster, m_serviceFactory, m_componentFactory, m_schFactory, HOSTNAME); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java index 19b5d46..1c4567f 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java @@ -17,19 +17,13 @@ */ package org.apache.ambari.server.state.alerts; -import java.lang.reflect.Field; import java.util.UUID; import junit.framework.Assert; -import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.events.AlertDefinitionChangedEvent; import org.apache.ambari.server.events.AlertDefinitionDeleteEvent; import org.apache.ambari.server.events.AmbariEvent; -import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener; -import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; -import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener; -import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; @@ -53,11 +47,11 @@ import org.apache.ambari.server.state.alert.Reporting; import org.apache.ambari.server.state.alert.Reporting.ReportTemplate; import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.alert.SourceType; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Injector; @@ -77,10 +71,8 @@ public class AlertEventPublisherTest { private String clusterName; private Injector injector; private ServiceFactory serviceFactory; - private AmbariMetaInfo metaInfo; private OrmTestHelper ormHelper; private AggregateDefinitionMapping aggregateMapping; - private AmbariEventPublisher eventPublisher; /** * @@ -90,18 +82,7 @@ public class AlertEventPublisherTest { injector = Guice.createInjector(new InMemoryDefaultTestModule()); injector.getInstance(GuiceJpaInitializer.class); - eventPublisher = injector.getInstance(AmbariEventPublisher.class); - EventBus synchronizedBus = new EventBus(); - - // force singleton init via Guice so the listener registers with the bus - synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class)); - synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); - synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class)); - - // !!! need a synchronous op for testing - Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(eventPublisher, synchronizedBus); + EventBusSynchronizer.synchronizeAmbariEventPublisher(injector); dispatchDao = injector.getInstance(AlertDispatchDAO.class); definitionDao = injector.getInstance(AlertDefinitionDAO.class); @@ -111,8 +92,6 @@ public class AlertEventPublisherTest { ormHelper = injector.getInstance(OrmTestHelper.class); aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class); - metaInfo = injector.getInstance(AmbariMetaInfo.class); - clusterName = "foo"; clusters.addCluster(clusterName); cluster = clusters.getCluster(clusterName); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java index b64afed..7144625 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java @@ -17,7 +17,6 @@ */ package org.apache.ambari.server.state.alerts; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashSet; @@ -25,8 +24,6 @@ import java.util.List; import java.util.Set; import org.apache.ambari.server.events.AlertStateChangeEvent; -import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; -import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener; import org.apache.ambari.server.events.publishers.AlertEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -38,12 +35,12 @@ import org.apache.ambari.server.orm.entities.AlertNoticeEntity; import org.apache.ambari.server.orm.entities.AlertTargetEntity; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -71,20 +68,13 @@ public class AlertStateChangedEventTest { injector.getInstance(GuiceJpaInitializer.class); - // force singleton init via Guice so the listener registers with the bus - injector.getInstance(AlertServiceStateListener.class); - injector.getInstance(AlertStateChangedListener.class); - dispatchDao = injector.getInstance(AlertDispatchDAO.class); - eventPublisher = injector.getInstance(AlertEventPublisher.class); - - EventBus synchronizedBus = new EventBus(); - synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); // !!! need a synchronous op for testing - Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(eventPublisher, synchronizedBus); + EventBusSynchronizer.synchronizeAlertEventPublisher(injector); + EventBusSynchronizer.synchronizeAmbariEventPublisher(injector); + + eventPublisher = injector.getInstance(AlertEventPublisher.class); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java index 4e55c49..73bf6c4 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java @@ -17,17 +17,11 @@ */ package org.apache.ambari.server.state.alerts; -import java.lang.reflect.Field; - import junit.framework.Assert; -import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.events.AlertReceivedEvent; import org.apache.ambari.server.events.InitialAlertEvent; import org.apache.ambari.server.events.MockEventListener; -import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener; -import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; -import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; import org.apache.ambari.server.events.publishers.AlertEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; @@ -41,6 +35,7 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,7 +63,6 @@ public class InitialAlertEventTest { private Cluster m_cluster; private String m_clusterName; private ServiceFactory m_serviceFactory; - private AmbariMetaInfo m_metaInfo; /** * @@ -83,28 +77,18 @@ public class InitialAlertEventTest { // get a mock listener m_listener = m_injector.getInstance(MockEventListener.class); - m_alertsDao = m_injector.getInstance(AlertsDAO.class); - // create the publisher and mock listener m_eventPublisher = m_injector.getInstance(AlertEventPublisher.class); - EventBus synchronizedBus = new EventBus(); // register listeners needed + EventBus synchronizedBus = EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector); synchronizedBus.register(m_listener); - synchronizedBus.register(m_injector.getInstance(AlertLifecycleListener.class)); - synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class)); - synchronizedBus.register(m_injector.getInstance(AlertReceivedListener.class)); - - // !!! need a synchronous op for testing - Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(m_eventPublisher, synchronizedBus); m_definitionDao = m_injector.getInstance(AlertDefinitionDAO.class); m_clusters = m_injector.getInstance(Clusters.class); m_serviceFactory = m_injector.getInstance(ServiceFactory.class); - m_metaInfo = m_injector.getInstance(AmbariMetaInfo.class); + m_alertsDao = m_injector.getInstance(AlertsDAO.class); m_clusterName = "c1"; m_clusters.addCluster(m_clusterName); @@ -190,12 +174,14 @@ public class InitialAlertEventTest { /** * */ - private class MockModule implements Module { + private static class MockModule implements Module { /** * {@inheritDoc} */ @Override public void configure(Binder binder) { + // sychronize on the ambari event bus for this test to work properly + EventBusSynchronizer.synchronizeAmbariEventPublisher(binder); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java index acf7911..c289bcc 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java @@ -65,11 +65,11 @@ import org.apache.ambari.server.state.alert.Reporting.ReportTemplate; import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.alert.Source; import org.apache.ambari.server.state.alert.SourceType; +import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import com.google.gson.Gson; import com.google.inject.Guice; @@ -396,12 +396,9 @@ public class AlertDataManagerTest { m_dao.merge(current); } - AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class); - // !!! need a synchronous op for testing - field = AlertEventPublisher.class.getDeclaredField("m_eventBus"); - field.setAccessible(true); - field.set(publisher, new EventBus()); + AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class); + EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector); final AtomicReference ref = new AtomicReference(); publisher.register(new TestListener() { http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java index 766105d..ff039a9 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.ServiceNotFoundException; -import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; @@ -80,9 +79,6 @@ public class ClusterDeadlockTest { private ServiceComponentHostFactory serviceComponentHostFactory; @Inject - private AmbariMetaInfo metaInfo; - - @Inject private OrmTestHelper helper; private StackId stackId = new StackId("HDP-0.1"); http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java index 839b25f..d771eba 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java @@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; -import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.ServiceComponentNotFoundException; +import org.apache.ambari.server.ServiceNotFoundException; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; @@ -34,7 +36,14 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.RepositoryVersionState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentFactory; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.State; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,6 +64,8 @@ public class ClustersDeadlockTest { private final AtomicInteger hostNameCounter = new AtomicInteger(0); + private final StackId stackId = new StackId("HDP-0.1"); + @Inject private Injector injector; @@ -62,7 +73,13 @@ public class ClustersDeadlockTest { private Clusters clusters; @Inject - private AmbariMetaInfo metaInfo; + private ServiceFactory serviceFactory; + + @Inject + private ServiceComponentFactory serviceComponentFactory; + + @Inject + private ServiceComponentHostFactory serviceComponentHostFactory; @Inject private OrmTestHelper helper; @@ -81,6 +98,9 @@ public class ClustersDeadlockTest { cluster.setDesiredStackVersion(stackId); helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion()); cluster.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING); + + // install HDFS + installService("HDFS"); } @After @@ -89,7 +109,7 @@ public class ClustersDeadlockTest { } /** - * Tests that no deadlock exists when adding hosts from reading from the + * Tests that no deadlock exists when adding hosts while reading from the * cluster. * * @throws Exception @@ -117,7 +137,34 @@ public class ClustersDeadlockTest { } /** - * Tests that no deadlock exists when adding hosts from reading from the + * Tests that no deadlock exists when adding hosts while reading from the + * cluster. This test ensures that there are service components installed on + * the hosts so that the cluster health report does some more work. + * + * @throws Exception + */ + @Test(timeout = 35000) + public void testDeadlockWhileMappingHostsWithExistingServices() + throws Exception { + List threads = new ArrayList(); + for (int i = 0; i < NUMBER_OF_THREADS; i++) { + ClusterReaderThread readerThread = new ClusterReaderThread(); + ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread(); + + threads.add(readerThread); + threads.add(writerThread); + + readerThread.start(); + writerThread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + } + + /** + * Tests that no deadlock exists when adding hosts while reading from the * cluster. * * @throws Exception @@ -194,6 +241,38 @@ public class ClustersDeadlockTest { } /** + * The {@link ClustersHostAndComponentMapperThread} is used to map hosts to a + * cluster over and over. This will also add components to the hosts that are + * being mapped to further exercise the cluster health report concurrency. + */ + private final class ClustersHostAndComponentMapperThread extends Thread { + + /** + * {@inheritDoc} + */ + @Override + public void run() { + try { + for (int i = 0; i < NUMBER_OF_HOSTS; i++) { + String hostName = "c64-" + hostNameCounter.getAndIncrement(); + clusters.addHost(hostName); + setOsFamily(clusters.getHost(hostName), "redhat", "6.4"); + clusters.getHost(hostName).persist(); + clusters.mapHostToCluster(hostName, CLUSTER_NAME); + + // create DATANODE on this host so that we end up exercising the + // cluster health report since we need a service component host + createNewServiceComponentHost("HDFS", "DATANODE", hostName); + + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + /** * The {@link ClustersHostUnMapperThread} is used to unmap hosts to a cluster * over and over. */ @@ -235,4 +314,53 @@ public class ClustersDeadlockTest { hostAttributes.put("os_release_version", osVersion); host.setHostAttributes(hostAttributes); } + + private Service installService(String serviceName) throws AmbariException { + Service service = null; + + try { + service = cluster.getService(serviceName); + } catch (ServiceNotFoundException e) { + service = serviceFactory.createNew(cluster, serviceName); + cluster.addService(service); + service.persist(); + } + + return service; + } + + private ServiceComponent addServiceComponent(Service service, + String componentName) throws AmbariException { + ServiceComponent serviceComponent = null; + try { + serviceComponent = service.getServiceComponent(componentName); + } catch (ServiceComponentNotFoundException e) { + serviceComponent = serviceComponentFactory.createNew(service, + componentName); + service.addServiceComponent(serviceComponent); + serviceComponent.setDesiredState(State.INSTALLED); + serviceComponent.persist(); + } + + return serviceComponent; + } + + private ServiceComponentHost createNewServiceComponentHost(String svc, + String svcComponent, String hostName) throws AmbariException { + Assert.assertNotNull(cluster.getConfigGroups()); + Service s = installService(svc); + ServiceComponent sc = addServiceComponent(s, svcComponent); + + ServiceComponentHost sch = serviceComponentHostFactory.createNew(sc, + hostName); + + sc.addServiceComponentHost(sch); + sch.setDesiredState(State.INSTALLED); + sch.setState(State.INSTALLED); + sch.setDesiredStackVersion(stackId); + sch.setStackVersion(stackId); + + sch.persist(); + return sch; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java new file mode 100644 index 0000000..4b0c031 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.utils; + +import java.lang.reflect.Field; + +import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener; +import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener; +import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener; +import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; +import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener; +import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener; +import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener; +import org.apache.ambari.server.events.publishers.AlertEventPublisher; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.inject.Binder; +import com.google.inject.Injector; + +/** + * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus} + * used by Guava with a synchronous, serial {@link EventBus} instance. This + * enables testing that relies on testing the outcome of asynchronous events by + * executing the events on the current thread serially. + */ +public class EventBusSynchronizer { + + /** + * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial + * and synchronous. + * + * @param binder + */ + public static void synchronizeAmbariEventPublisher(Binder binder) { + EventBus synchronizedBus = new EventBus(); + AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher(); + + replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher, + synchronizedBus); + + binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher); + } + + /** + * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial + * and synchronous. Also register the known listeners. Registering known + * listeners is necessary since the event bus was replaced. + * + * @param injector + */ + public static EventBus synchronizeAmbariEventPublisher(Injector injector) { + EventBus synchronizedBus = new EventBus(); + AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class); + + replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus); + + // register common ambari event listeners + registerAmbariListeners(injector, synchronizedBus); + + return synchronizedBus; + } + + /** + * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial + * and synchronous. Also register the known listeners. Registering known + * listeners is necessary since the event bus was replaced. + * + * @param injector + */ + public static EventBus synchronizeAlertEventPublisher(Injector injector) { + EventBus synchronizedBus = new EventBus(); + AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class); + + replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus); + + // register common alert event listeners + registerAlertListeners(injector, synchronizedBus); + + return synchronizedBus; + } + + /** + * Register the normal listeners with the replaced synchronous bus. + * + * @param injector + * @param synchronizedBus + */ + private static void registerAmbariListeners(Injector injector, + EventBus synchronizedBus) { + synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class)); + synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class)); + synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class)); + synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class)); + } + + /** + * Register the normal listeners with the replaced synchronous bus. + * + * @param injector + * @param synchronizedBus + */ + private static void registerAlertListeners(Injector injector, + EventBus synchronizedBus) { + synchronizedBus.register(injector.getInstance(AlertAggregateListener.class)); + synchronizedBus.register(injector.getInstance(AlertReceivedListener.class)); + synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); + } + + private static void replaceEventBus(Class eventPublisherClass, + Object instance, EventBus eventBus) { + + try { + Field field = eventPublisherClass.getDeclaredField("m_eventBus"); + field.setAccessible(true); + field.set(instance, eventBus); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } +}