ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject [23/50] [abbrv] git commit: AMBARI-7316 - Alerts: Create Alert Notices For Incoming Alert State Changes (jonathanhurley)
Date Mon, 06 Oct 2014 21:03:45 GMT
AMBARI-7316 - Alerts: Create Alert Notices For Incoming Alert State Changes (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/760bedfe
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/760bedfe
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/760bedfe

Branch: refs/heads/trunk
Commit: 760bedfe32f2a673a14cc14e7f2d41794ca3db98
Parents: aaf0513
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Mon Sep 15 14:04:42 2014 -0400
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Mon Sep 15 14:04:42 2014 -0400

----------------------------------------------------------------------
 .../ambari/server/agent/HeartBeatHandler.java   |  20 ++-
 .../server/controller/ControllerModule.java     |  15 ++
 .../apache/ambari/server/events/AlertEvent.java |  58 +++++++
 .../server/events/AlertReceivedEvent.java       |  50 +++++++
 .../server/events/AlertStateChangeEvent.java    |  85 +++++++++++
 .../events/listeners/AlertReceivedListener.java | 150 +++++++++++++++++++
 .../listeners/AlertStateChangedListener.java    |  98 ++++++++++++
 .../events/publishers/AlertEventPublisher.java  | 100 +++++++++++++
 .../ambari/server/orm/dao/AlertDispatchDAO.java |  22 +++
 .../server/orm/entities/AlertGroupEntity.java   |   7 +-
 .../org/apache/ambari/server/state/Alert.java   |  55 ++++---
 .../server/state/cluster/AlertDataManager.java  | 111 --------------
 .../server/orm/dao/AlertDispatchDAOTest.java    |  29 ++++
 .../state/cluster/AlertDataManagerTest.java     | 129 ++++++++++++----
 14 files changed, 753 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 492d832..a366301 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -39,6 +39,9 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.metadata.ActionMetadata;
 import org.apache.ambari.server.state.AgentVersion;
 import org.apache.ambari.server.state.Alert;
@@ -60,7 +63,6 @@ import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
-import org.apache.ambari.server.state.cluster.AlertDataManager;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
 import org.apache.ambari.server.state.host.HostRegistrationRequestEvent;
@@ -115,9 +117,12 @@ public class HeartBeatHandler {
 
   @Inject
   private AlertDefinitionHash alertDefinitionHash;
-  
+
+  /**
+   * Publishes {@link AlertEvent} instances.
+   */
   @Inject
-  private AlertDataManager alertManager;
+  private AlertEventPublisher alertEventPublisher;
 
   private Map<String, Long> hostResponseIds = new ConcurrentHashMap<String, Long>();
 
@@ -236,13 +241,16 @@ public class HeartBeatHandler {
     if (null == hostname || null == heartbeat) {
       return;
     }
-    
+
     if (null != heartbeat.getAlerts()) {
       for (Alert alert : heartbeat.getAlerts()) {
-        if (null == alert.getHost())
+        if (null == alert.getHost()) {
           alert.setHost(hostname);
+        }
+
         Cluster cluster = clusterFsm.getCluster(alert.getCluster());
-        alertManager.add(cluster.getClusterId(), alert);
+        AlertEvent event = new AlertReceivedEvent(cluster.getClusterId(), alert);
+        alertEventPublisher.publish(event);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index c395df6..8413762 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -51,6 +51,8 @@ import org.apache.ambari.server.controller.internal.HostResourceProvider;
 import org.apache.ambari.server.controller.internal.MemberResourceProvider;
 import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.listeners.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessorImpl;
 import org.apache.ambari.server.orm.PersistenceType;
@@ -222,6 +224,8 @@ public class ControllerModule extends AbstractModule {
     bind(ViewInstanceHandlerList.class).to(AmbariHandlerList.class);
 
     requestStaticInjection(ExecutionCommandWrapper.class);
+
+    bindEagerSingletons();
   }
 
 
@@ -299,4 +303,15 @@ public class ControllerModule extends AbstractModule {
     bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class);
     bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance());
   }
+
+  /**
+   * Initializes all eager singletons that should be instantiated as soon as
+   * possible and not wait for injection.
+   */
+  private void bindEagerSingletons() {
+    // alert subscribers are "headless" and have no guice references; created
+    // them as eager singletons to have them register with the eventbus
+    bind(AlertReceivedListener.class).asEagerSingleton();
+    bind(AlertStateChangedListener.class).asEagerSingleton();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/AlertEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertEvent.java
new file mode 100644
index 0000000..a75477d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.Alert;
+
+/**
+ * The {@link AlertEvent} class the base for all events related to alerts.
+ */
+public abstract class AlertEvent {
+
+  protected long m_clusterId;
+  protected Alert m_alert;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param alert
+   */
+  public AlertEvent(long clusterId, Alert alert) {
+    m_clusterId = clusterId;
+    m_alert = alert;
+  }
+
+  /**
+   * Gets the cluster ID that the alert belongs to.
+   *
+   * @return the ID of the cluster.
+   */
+  public long getClusterId() {
+    return m_clusterId;
+  }
+
+  /**
+   * Gets the alert that this event is created for.
+   *
+   * @return the alert (never {@code null}).
+   */
+  public Alert getAlert(){
+    return m_alert;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/AlertReceivedEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertReceivedEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertReceivedEvent.java
new file mode 100644
index 0000000..558c9da
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertReceivedEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.Alert;
+
+/**
+ * The {@link AlertReceivedEvent} is fired when an {@link Alert} is received or
+ * generated.
+ */
+public final class AlertReceivedEvent extends AlertEvent {
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param alert
+   */
+  public AlertReceivedEvent(long clusterId, Alert alert) {
+    super(clusterId, alert);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("AlertReceivedEvent{ ");
+    buffer.append("cluserId=").append(m_clusterId);
+    buffer.append(", alert=").append(m_alert);
+
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java
new file mode 100644
index 0000000..ab2c3dd
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+
+/**
+ * The {@link AlertStateChangeEvent} is fired when an {@link Alert} instance has
+ * its {@link AlertState} changed.
+ */
+public final class AlertStateChangeEvent extends AlertEvent {
+
+  /**
+   * The prior alert state.
+   */
+  private final AlertState m_fromState;
+
+  /**
+   * The newly created historical entry.
+   */
+  private final AlertHistoryEntity m_newEntity;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param alert
+   */
+  public AlertStateChangeEvent(long clusterId, Alert alert,
+      AlertHistoryEntity newEntity, AlertState fromState) {
+    super(clusterId, alert);
+
+    m_newEntity = newEntity;
+    m_fromState = fromState;
+  }
+
+  /**
+   * Gets the newly created item in alert history.
+   *
+   * @return the newly created historical item.
+   */
+  public AlertHistoryEntity getNewHistoricalEntry() {
+    return m_newEntity;
+  }
+
+  /**
+   * Gets the prior state of the alert.
+   *
+   * @return the prior state of the alert.
+   */
+  public AlertState getFromState() {
+    return m_fromState;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("AlertStateChangeEvent{ ");
+    buffer.append("cluserId=").append(m_clusterId);
+    buffer.append(", fromState=").append(m_fromState);
+    buffer.append(", alert=").append(m_alert);
+
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
new file mode 100644
index 0000000..1bdd7e2
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners;
+
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
+ * and updates the appropirate DAOs. It may also fire new
+ * {@link AlertStateChangeEvent} when an {@link AlertState} change is detected.
+ */
+@Singleton
+public class AlertReceivedListener {
+  /**
+   * Logger.
+   */
+  private static Log LOG = LogFactory.getLog(AlertReceivedListener.class);
+
+  @Inject
+  private AlertsDAO m_alertsDao;
+
+  @Inject
+  private AlertDefinitionDAO m_definitionDao;
+
+  /**
+   * Receives and publishes {@link AlertEvent} instances.
+   */
+  private AlertEventPublisher m_alertEventPublisher;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertReceivedListener(AlertEventPublisher publisher) {
+    m_alertEventPublisher = publisher;
+    m_alertEventPublisher.register(this);
+  }
+
+
+  /**
+   * Adds an alert.  Checks for a new state before creating a new history record.
+   *
+   * @param clusterId the id for the cluster
+   * @param alert the alert to add
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAlertEvent(AlertReceivedEvent event) {
+    long clusterId = event.getClusterId();
+    Alert alert = event.getAlert();
+
+    AlertCurrentEntity current = m_alertsDao.findCurrentByHostAndName(clusterId,
+        alert.getHost(), alert.getName());
+
+    if (null == current) {
+      AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
+          alert.getName());
+
+      AlertHistoryEntity history = createHistory(clusterId, definition, alert);
+
+      current = new AlertCurrentEntity();
+      current.setAlertHistory(history);
+      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+
+      m_alertsDao.create(current);
+
+    } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
+      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setLatestText(alert.getText());
+
+      m_alertsDao.merge(current);
+    } else {
+      AlertState oldState = current.getAlertHistory().getAlertState();
+
+      // insert history, update current
+      AlertHistoryEntity history = createHistory(clusterId,
+          current.getAlertHistory().getAlertDefinition(), alert);
+
+      current.setAlertHistory(history);
+      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+
+      m_alertsDao.merge(current);
+
+      // broadcast the alert changed event for other subscribers
+      AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent(
+          event.getClusterId(), event.getAlert(), history, oldState);
+
+      m_alertEventPublisher.publish(alertChangedEvent);
+    }
+  }
+
+  /**
+   * Convenience to create a new alert.
+   * @param clusterId the cluster id
+   * @param definition the definition
+   * @param alert the alert data
+   * @return the new history record
+   */
+  private AlertHistoryEntity createHistory(long clusterId, AlertDefinitionEntity definition, Alert alert) {
+    AlertHistoryEntity history = new AlertHistoryEntity();
+    history.setAlertDefinition(definition);
+    history.setAlertInstance(alert.getInstance());
+    history.setAlertLabel(alert.getLabel());
+    history.setAlertState(alert.getState());
+    history.setAlertText(alert.getText());
+    history.setAlertTimestamp(Long.valueOf(alert.getTimestamp()));
+    history.setClusterId(Long.valueOf(clusterId));
+    history.setComponentName(alert.getComponent());
+    history.setHostName(alert.getHost());
+    history.setServiceName(alert.getService());
+
+    return history;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
new file mode 100644
index 0000000..a327110
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertGroupEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.NotificationState;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertStateChangedListener} class response to
+ * {@link AlertStateChangeEvent} and updates {@link AlertNoticeEntity} instances
+ * in the database.
+ */
+@Singleton
+public class AlertStateChangedListener {
+
+  /**
+   * Logger.
+   */
+  private static Log LOG = LogFactory.getLog(AlertStateChangedListener.class);
+
+  /**
+   * Used for looking up groups and targets.
+   */
+  @Inject
+  private AlertDispatchDAO m_alertsDispatchDao;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertStateChangedListener(AlertEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Listens for when an alert's state has changed.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAlertEvent(AlertStateChangeEvent event) {
+    AlertHistoryEntity history = event.getNewHistoricalEntry();
+    AlertDefinitionEntity definition = history.getAlertDefinition();
+
+    List<AlertGroupEntity> groups = m_alertsDispatchDao.findGroupsByDefinition(definition);
+
+    // for each group, determine if there are any targets that need to receive
+    // a notification about the alert state change event
+    for (AlertGroupEntity group : groups) {
+      Set<AlertTargetEntity> targets = group.getAlertTargets();
+      if (null == targets || targets.size() == 0) {
+        continue;
+      }
+
+      for (AlertTargetEntity target : targets) {
+        AlertNoticeEntity notice = new AlertNoticeEntity();
+        notice.setAlertTarget(target);
+        notice.setAlertHistory(event.getNewHistoricalEntry());
+        notice.setNotifyState(NotificationState.PENDING);
+
+        m_alertsDispatchDao.merge(notice);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
new file mode 100644
index 0000000..e42e317
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.publishers;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ambari.server.events.AlertEvent;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertEventPublisher} is used to wrap a customized instance of an
+ * {@link AsyncEventBus} that is only used for alerts. In general, Ambari should
+ * have its own application-wide event bus for application events (session
+ * information, state changes, etc), but since alerts can contain many events
+ * being published concurrently, it makes sense to encapsulate a specific alert
+ * bus in this publisher.
+ */
+@Singleton
+public final class AlertEventPublisher {
+
+  /**
+   * A multi-threaded event bus that can handle dispatching {@link AlertEvent}s.
+   */
+  private final EventBus s_eventBus;
+
+  /**
+   * Constructor.
+   */
+  public AlertEventPublisher() {
+    s_eventBus = new AsyncEventBus(Executors.newFixedThreadPool(2,
+        new AlertEventBusThreadFactory()));
+  }
+
+  /**
+   * Publishes the specified event to all registered listeners that
+   * {@link Subscribe} to any of the {@link AlertEvent} instances.
+   *
+   * @param event
+   */
+  public void publish(AlertEvent event) {
+    s_eventBus.post(event);
+  }
+
+  /**
+   * Register a listener to receive events. The listener should use the
+   * {@link Subscribe} annotation.
+   *
+   * @param object
+   *          the listener to receive events.
+   */
+  public void register(Object object) {
+    s_eventBus.register(object);
+  }
+
+  /**
+   * A custom {@link ThreadFactory} for the threads that will handle published
+   * {@link AlertEvent}. Threads created will have slightly reduced priority
+   * since {@link AlertEvent} instances are not critical to the system.
+   */
+  private static final class AlertEventBusThreadFactory implements
+      ThreadFactory {
+
+    private static final AtomicInteger s_threadIdPool = new AtomicInteger(1);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r, "alert-event-bus-"
+          + s_threadIdPool.getAndIncrement());
+
+      thread.setDaemon(false);
+      thread.setPriority(Thread.NORM_PRIORITY - 1);
+
+      return thread;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
index e08c948..6d4d19b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java
@@ -22,9 +22,11 @@ import java.util.List;
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.alert.AlertGroup;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -192,6 +194,26 @@ public class AlertDispatchDAO {
   }
 
   /**
+   * Gets all of the {@link AlertGroup} instances that include the specified
+   * alert definition.
+   *
+   * @param definitionEntity
+   *          the definition that the group must include (not {@code null}).
+   * @return all alert groups that have an association with the specified
+   *         definition or empty list if none exist (never {@code null}).
+   */
+  public List<AlertGroupEntity> findGroupsByDefinition(
+      AlertDefinitionEntity definitionEntity) {
+
+    TypedQuery<AlertGroupEntity> query = entityManagerProvider.get().createNamedQuery(
+        "AlertGroupEntity.findByAssociatedDefinition", AlertGroupEntity.class);
+
+    query.setParameter("alertDefinition", definitionEntity);
+
+    return daoUtils.selectList(query);
+  }
+
+  /**
    * Gets all alert notifications stored in the database.
    *
    * @return all alert notifications or empty list if none exist (never

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
index f97a0eb..e7fa9c6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
@@ -48,7 +48,8 @@ import javax.persistence.UniqueConstraint;
     @NamedQuery(name = "AlertGroupEntity.findAll", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup"),
     @NamedQuery(name = "AlertGroupEntity.findAllInCluster", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.clusterId = :clusterId"),
     @NamedQuery(name = "AlertGroupEntity.findByName", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = :groupName"),
-    @NamedQuery(name = "AlertGroupEntity.findByNameInCluster", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = :groupName AND alertGroup.clusterId = :clusterId"), })
+    @NamedQuery(name = "AlertGroupEntity.findByNameInCluster", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE alertGroup.groupName = :groupName AND alertGroup.clusterId = :clusterId"),
+    @NamedQuery(name = "AlertGroupEntity.findByAssociatedDefinition", query = "SELECT alertGroup FROM AlertGroupEntity alertGroup WHERE :alertDefinition MEMBER OF alertGroup.alertDefinitions"), })
 public class AlertGroupEntity {
 
   @Id
@@ -224,7 +225,7 @@ public class AlertGroupEntity {
   /**
    * Adds the specified definition to the definitions that this group will
    * dispatch to.
-   * 
+   *
    * @param definition
    *          the definition to add (not {@code null}).
    */
@@ -240,7 +241,7 @@ public class AlertGroupEntity {
   /**
    * Removes the specified definition from the definitions that this group will
    * dispatch to.
-   * 
+   *
    * @param definition
    *          the definition to remove (not {@code null}).
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
index 7b8aabd..3675f87 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
@@ -32,8 +32,8 @@ public class Alert {
   private String label = null;
   private String text = null;
   private long timestamp = 0L;
-  
- 
+
+
   /**
    * Constructor.
    * @param alertName the name of the alert
@@ -56,32 +56,32 @@ public class Alert {
 
   public Alert() {
   }
- 
+
   /**
    * @return the name
    */
 
-  @JsonProperty("name")     
+  @JsonProperty("name")
   public String getName() {
     return name;
   }
- 
+
   /**
    * @return the service
    */
-  @JsonProperty("service")    
+  @JsonProperty("service")
   public String getService() {
     return service;
   }
-  
+
   /**
    * @return the component
    */
-  @JsonProperty("component")  
+  @JsonProperty("component")
   public String getComponent() {
     return component;
   }
- 
+
   /**
    * @return the host
    */
@@ -89,7 +89,7 @@ public class Alert {
   public String getHost() {
     return host;
   }
- 
+
   /**
    * @return the state
    */
@@ -101,7 +101,7 @@ public class Alert {
   /**
    * @return a short descriptive label for the alert
    */
-  @JsonProperty("label")  
+  @JsonProperty("label")
   public String getLabel() {
     return label;
   }
@@ -109,32 +109,32 @@ public class Alert {
   /**
    * @param alertLabel a short descriptive label for the alert
    */
-  @JsonProperty("label")   
+  @JsonProperty("label")
   public void setLabel(String alertLabel) {
     label = alertLabel;
   }
- 
+
   /**
    * @return detail text about the alert
    */
-  @JsonProperty("text")   
+  @JsonProperty("text")
   public String getText() {
     return text;
   }
-  
+
   /**
    * @param alertText detail text about the alert
    */
-  @JsonProperty("text")   
+  @JsonProperty("text")
   public void setText(String alertText) {
     text = alertText;
   }
 
-  @JsonProperty("instance")  
+  @JsonProperty("instance")
   public String getInstance() {
     return instance;
   }
-  
+
   @JsonProperty("instance")
   public void setInstance(String instance) {
     this.instance = instance;
@@ -164,24 +164,24 @@ public class Alert {
   public void setState(AlertState state) {
     this.state = state;
   }
-  
+
   @JsonProperty("timestamp")
   public void setTimestamp(long ts) {
     timestamp = ts;
   }
-  
+
   @JsonProperty("timestamp")
   public long getTimestamp() {
     return timestamp;
   }
-  
+
   /**
    * @return
    */
   public String getCluster() {
     return cluster;
   }
-  
+
   @Override
   public int hashCode() {
     int result = alertHashCode();
@@ -197,8 +197,9 @@ public class Alert {
    */
   @Override
   public boolean equals(Object o) {
-    if (null == o || !Alert.class.isInstance(o))
+    if (null == o || !Alert.class.isInstance(o)) {
       return false;
+    }
 
     return hashCode() == o.hashCode();
   }
@@ -217,7 +218,7 @@ public class Alert {
 
   /**
    * Checks equality with another alert, not taking into account instance info
-   * 
+   *
    * @param that
    *          the other alert to compare against
    * @return <code>true</code> when the alert is equal in every way except the
@@ -226,8 +227,7 @@ public class Alert {
   public boolean almostEquals(Alert that) {
     return alertHashCode() == that.alertHashCode();
   }
-  
-  
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -242,7 +242,4 @@ public class Alert {
     sb.append('}');
     return sb.toString();
   }
-
-  
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/AlertDataManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/AlertDataManager.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/AlertDataManager.java
deleted file mode 100644
index 4a65d5a..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/AlertDataManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.state.cluster;
-
-import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
-import org.apache.ambari.server.orm.dao.AlertsDAO;
-import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
-import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
-import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
-import org.apache.ambari.server.state.Alert;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-/**
- * The AlertManager is reponsible for tracking all alerts generated for a cluster. 
- */
-@Singleton
-public class AlertDataManager {
-
-  @Inject
-  private AlertsDAO m_alertsDao;
-  @Inject
-  private AlertDefinitionDAO m_definitionDao;
-
-  
-  AlertDataManager() {
-  }
-  
-  /**
-   * Adds an alert.  Checks for a new state before creating a new history record.
-   * 
-   * @param clusterId the id for the cluster
-   * @param alert the alert to add
-   */
-  public void add(long clusterId, Alert alert) {
-    
-    AlertCurrentEntity current = m_alertsDao.findCurrentByHostAndName(clusterId,
-        alert.getHost(), alert.getName());
-    
-    if (null == current) {
-      AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
-          alert.getName());
-      
-      AlertHistoryEntity history = createHistory(clusterId, definition, alert);
-      
-      current = new AlertCurrentEntity();
-      current.setAlertHistory(history);
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
-      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
-      
-      m_alertsDao.create(current);
-      
-    } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
-      current.setLatestText(alert.getText());
-      
-      m_alertsDao.merge(current);
-    } else {
-      // insert history, update current
-      AlertHistoryEntity history = createHistory(clusterId,
-          current.getAlertHistory().getAlertDefinition(), alert);
-      
-      current.setAlertHistory(history);
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
-      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
-      
-      m_alertsDao.merge(current);
-    }
-    
-  }
-  /**
-   * Convenience to create a new alert.
-   * @param clusterId the cluster id
-   * @param definition the definition
-   * @param alert the alert data
-   * @return the new history record
-   */
-  private AlertHistoryEntity createHistory(long clusterId, AlertDefinitionEntity definition, Alert alert) {
-    AlertHistoryEntity history = new AlertHistoryEntity();
-    history.setAlertDefinition(definition);
-    history.setAlertInstance(alert.getInstance());
-    history.setAlertLabel(alert.getLabel());
-    history.setAlertState(alert.getState());
-    history.setAlertText(alert.getText());
-    history.setAlertTimestamp(Long.valueOf(alert.getTimestamp()));
-    history.setClusterId(Long.valueOf(clusterId));
-    history.setComponentName(alert.getComponent());
-    history.setHostName(alert.getHost());
-    history.setServiceName(alert.getService());
-    
-    return history;
-  }
-  
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
index 8451c9b..1103961 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
@@ -351,6 +351,35 @@ public class AlertDispatchDAOTest {
   }
 
   /**
+   * Tests finding groups by a definition ID that they are associatd with.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testFindGroupsByDefinition() throws Exception {
+    List<AlertDefinitionEntity> definitions = createDefinitions();
+    AlertGroupEntity group = helper.createAlertGroup(clusterId, null);
+
+    group = dao.findGroupById(group.getGroupId());
+    assertNotNull(group);
+
+    for (AlertDefinitionEntity definition : definitions) {
+      group.addAlertDefinition(definition);
+    }
+
+    dao.merge(group);
+
+    group = dao.findGroupByName(group.getGroupName());
+    assertEquals(definitions.size(), group.getAlertDefinitions().size());
+
+    for (AlertDefinitionEntity definition : definitions) {
+      List<AlertGroupEntity> groups = dao.findGroupsByDefinition(definition);
+      assertEquals(1, groups.size());
+      assertEquals(group.getGroupId(), groups.get(0).getGroupId());
+    }
+  }
+
+  /**
    * @return
    */
   private List<AlertDefinitionEntity> createDefinitions() throws Exception {

http://git-wip-us.apache.org/repos/asf/ambari/blob/760bedfe/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
index eae1de6..bbf7774 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
@@ -21,17 +21,28 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.listeners.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
 import org.apache.ambari.server.orm.dao.AlertsDAO;
 import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.orm.entities.AlertTargetEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.alert.Scope;
@@ -44,22 +55,24 @@ import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 
 /**
- * Tests the alert manager.
+ * Tests the management of {@link AlertEvent}s in the system.
  */
 public class AlertDataManagerTest {
-  
+
   private static final String ALERT_DEFINITION = "Alert Definition 1";
   private static final String SERVICE = "service1";
   private static final String COMPONENT = "component1";
   private static final String HOST1 = "h1";
   private static final String HOST2 = "h2";
   private static final String ALERT_LABEL = "My Label";
-  
+
   private Long clusterId;
   private Injector injector;
   private OrmTestHelper helper;
   private AlertsDAO dao;
-  
+  private AlertDispatchDAO dispatchDao;
+  private AlertDefinitionDAO definitionDao;
+
   @Before
   public void setup() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
@@ -67,7 +80,8 @@ public class AlertDataManagerTest {
     helper = injector.getInstance(OrmTestHelper.class);
     clusterId = helper.createCluster();
     dao = injector.getInstance(AlertsDAO.class);
-    AlertDefinitionDAO definitionDao = injector.getInstance(AlertDefinitionDAO.class);
+    dispatchDao = injector.getInstance(AlertDispatchDAO.class);
+    definitionDao = injector.getInstance(AlertDefinitionDAO.class);
 
     // create 5 definitions
     for (int i = 0; i < 5; i++) {
@@ -83,18 +97,16 @@ public class AlertDataManagerTest {
       definition.setSourceType("SCRIPT");
       definitionDao.create(definition);
     }
-  
   }
-  
+
   @After
   public void teardown() {
     injector.getInstance(PersistService.class).stop();
     injector = null;
   }
-  
+
   @Test
   public void testAlertRecords() {
-    
     Alert alert1 = new Alert(ALERT_DEFINITION, null, SERVICE, COMPONENT, HOST1, AlertState.OK);
     alert1.setLabel(ALERT_LABEL);
     alert1.setText("Component component1 is OK");
@@ -103,18 +115,24 @@ public class AlertDataManagerTest {
     Alert alert2 = new Alert(ALERT_DEFINITION, null, SERVICE, COMPONENT, HOST2, AlertState.CRITICAL);
     alert2.setLabel(ALERT_LABEL);
     alert2.setText("Component component2 is not OK");
-    
-    AlertDataManager am = injector.getInstance(AlertDataManager.class);
-    
-    am.add(clusterId.longValue(), alert1);
-    am.add(clusterId.longValue(), alert2);
-    
+
+    AlertReceivedListener listener = injector.getInstance(AlertReceivedListener.class);
+
+    AlertReceivedEvent event1 = new AlertReceivedEvent(clusterId.longValue(),
+        alert1);
+
+    AlertReceivedEvent event2 = new AlertReceivedEvent(clusterId.longValue(),
+        alert2);
+
+    listener.onAlertEvent(event1);
+    listener.onAlertEvent(event2);
+
     List<AlertCurrentEntity> allCurrent = dao.findCurrentByService(clusterId.longValue(), SERVICE);
     assertEquals(2, allCurrent.size());
-    
+
     List<AlertHistoryEntity> allHistory = dao.findAll(clusterId.longValue());
     assertEquals(2, allHistory.size());
-    
+
     AlertCurrentEntity current = dao.findCurrentByHostAndName(clusterId.longValue(), HOST1, ALERT_DEFINITION);
     assertNotNull(current);
     assertEquals(HOST1, current.getAlertHistory().getHostName());
@@ -124,17 +142,21 @@ public class AlertDataManagerTest {
     assertEquals(current.getAlertHistory().getAlertState(), AlertState.OK);
     assertEquals(1L, current.getOriginalTimestamp().longValue());
     assertEquals(1L, current.getLatestTimestamp().longValue());
-    
+
     Long currentId = current.getAlertId();
     Long historyId = current.getAlertHistory().getAlertId();
-    
+
     // no new history since the state is the same
     Alert alert3 = new Alert(ALERT_DEFINITION, null, SERVICE, COMPONENT, HOST1, AlertState.OK);
     alert3.setLabel(ALERT_LABEL);
     alert3.setText("Component component1 is OK");
     alert3.setTimestamp(2L);
-    am.add(clusterId.longValue(), alert3);
-    
+
+    AlertReceivedEvent event3 = new AlertReceivedEvent(clusterId.longValue(),
+        alert3);
+
+    listener.onAlertEvent(event3);
+
     current = dao.findCurrentByHostAndName(clusterId.longValue(), HOST1, ALERT_DEFINITION);
     assertNotNull(current);
     assertEquals(currentId, current.getAlertId());
@@ -146,24 +168,28 @@ public class AlertDataManagerTest {
     assertEquals(current.getAlertHistory().getAlertState(), AlertState.OK);
     assertEquals(1L, current.getOriginalTimestamp().longValue());
     assertEquals(2L, current.getLatestTimestamp().longValue());
-   
+
     allCurrent = dao.findCurrentByService(clusterId.longValue(), SERVICE);
     assertEquals(2, allCurrent.size());
-    
+
     allHistory = dao.findAll(clusterId.longValue());
     assertEquals(2, allHistory.size());
-    
+
     // change to warning
     Alert alert4 = new Alert(ALERT_DEFINITION, null, SERVICE, COMPONENT, HOST1, AlertState.WARNING);
     alert4.setLabel(ALERT_LABEL);
     alert4.setText("Component component1 is about to go down");
     alert4.setTimestamp(3L);
-    am.add(clusterId.longValue(), alert4);
+
+    AlertReceivedEvent event4 = new AlertReceivedEvent(clusterId.longValue(),
+        alert4);
+
+    listener.onAlertEvent(event4);
 
     current = dao.findCurrentByHostAndName(clusterId.longValue(), HOST1, ALERT_DEFINITION);
     assertNotNull(current);
     assertEquals(current.getAlertId(), currentId);
-    assertFalse(historyId.equals(current.getAlertHistory().getAlertId()));    
+    assertFalse(historyId.equals(current.getAlertHistory().getAlertId()));
     assertEquals(HOST1, current.getAlertHistory().getHostName());
     assertEquals(ALERT_DEFINITION, current.getAlertHistory().getAlertDefinition().getDefinitionName());
     assertEquals(ALERT_LABEL, current.getAlertHistory().getAlertLabel());
@@ -174,8 +200,57 @@ public class AlertDataManagerTest {
 
     allCurrent = dao.findCurrentByService(clusterId.longValue(), SERVICE);
     assertEquals(2, allCurrent.size());
-    
+
     allHistory = dao.findAll(clusterId.longValue());
     assertEquals(3, allHistory.size());
   }
+
+  /**
+   * Tests that {@link AlertStateChangeEvent} cause an {@link AlertNoticeEntity}
+   * entry.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAlertNotices() throws Exception {
+    List<AlertNoticeEntity> notices = dispatchDao.findAllNotices();
+    assertEquals( 0, notices.size() );
+
+    List<AlertDefinitionEntity> definitions = definitionDao.findAll(clusterId);
+    AlertDefinitionEntity definition = definitions.get(0);
+
+    AlertHistoryEntity history = new AlertHistoryEntity();
+    history.setServiceName(definition.getServiceName());
+    history.setClusterId(clusterId);
+    history.setAlertDefinition(definition);
+    history.setAlertLabel(definition.getDefinitionName());
+    history.setAlertText(definition.getDefinitionName());
+    history.setAlertTimestamp(System.currentTimeMillis());
+    history.setHostName(HOST1);
+    history.setAlertState(AlertState.OK);
+    dao.create(history);
+
+    List<AlertHistoryEntity> histories = dao.findAll(clusterId);
+    assertEquals(1, histories.size());
+
+    AlertTargetEntity target = helper.createAlertTarget();
+    Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>();
+    targets.add(target);
+
+    AlertGroupEntity group = helper.createAlertGroup(clusterId, targets);
+    group.addAlertDefinition( definitions.get(0) );
+    dispatchDao.merge(group);
+
+    Alert alert1 = new Alert(ALERT_DEFINITION, null, SERVICE, COMPONENT, HOST1,
+        AlertState.OK);
+
+    AlertStateChangeEvent event = new AlertStateChangeEvent(clusterId, alert1,
+        histories.get(0), AlertState.CRITICAL);
+
+    AlertStateChangedListener listener = injector.getInstance(AlertStateChangedListener.class);
+    listener.onAlertEvent(event);
+
+    notices = dispatchDao.findAllNotices();
+    assertEquals(1, notices.size());
+  }
 }


Mime
View raw message