ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonathanhur...@apache.org
Subject git commit: AMBARI-7508 - Alerts: Reschedule Individual Alerts on Agents (jonathanhurley)
Date Fri, 26 Sep 2014 16:47:39 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-alerts-dev 24483521f -> 7db82c923


AMBARI-7508 - Alerts: Reschedule Individual Alerts on Agents (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7db82c92
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7db82c92
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7db82c92

Branch: refs/heads/branch-alerts-dev
Commit: 7db82c923c2903a2e2fab3d302fb9337423da08b
Parents: 2448352
Author: Jonathan Hurley <jhurley@hortonworks.com>
Authored: Thu Sep 25 22:19:49 2014 -0700
Committer: Jonathan Hurley <jhurley@hortonworks.com>
Committed: Fri Sep 26 09:38:10 2014 -0700

----------------------------------------------------------------------
 .../ambari_agent/AlertSchedulerHandler.py       |  98 ++++++-
 .../python/ambari_agent/alerts/base_alert.py    |  37 ++-
 .../python/ambari_agent/alerts/collector.py     |  21 +-
 .../src/test/python/ambari_agent/TestAlerts.py  |  50 ++++
 .../ambari_agent/dummy_files/definitions.json   |   1 +
 ambari-project/pom.xml                          |   7 +-
 ambari-server/pom.xml                           |  12 +-
 .../internal/AlertGroupResourceProvider.java    |  33 ++-
 .../events/AlertDefinitionDeleteEvent.java      |  55 ++++
 .../ambari/server/events/AmbariEvent.java       |   7 +-
 .../listeners/AlertLifecycleListener.java       |  54 +++-
 .../events/listeners/AlertReceivedListener.java |   8 +
 .../server/orm/dao/AlertDefinitionDAO.java      |  13 +
 .../apache/ambari/server/orm/dao/AlertsDAO.java |   1 +
 .../orm/entities/AlertDefinitionEntity.java     |  15 +-
 .../server/orm/entities/AlertGroupEntity.java   |   5 +-
 .../state/alert/AggregateDefinitionMapping.java |  31 +-
 .../server/state/alert/AlertDefinition.java     |  19 ++
 .../state/alert/AlertDefinitionFactory.java     |   1 +
 .../server/state/alert/AlertDefinitionHash.java | 293 +++++++++++--------
 .../AlertGroupResourceProviderTest.java         |  87 ++++++
 .../server/orm/dao/AlertDefinitionDAOTest.java  |   6 +-
 .../alerts/AlertDefinitionEqualityTest.java     |   1 +
 .../state/alerts/AlertEventPublisherTest.java   |  68 ++++-
 .../alerts/AlertStateChangedEventTest.java      |  12 +-
 .../state/cluster/AlertDataManagerTest.java     |   2 +-
 26 files changed, 755 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 7cce533..8dcce50 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -61,22 +61,25 @@ class AlertSchedulerHandler():
         logger.critical("Could not create the cache directory {0}".format(cachedir))
         pass
 
+    self._collector = AlertCollector()
     self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
     self.__in_minutes = in_minutes
-    self.__collector = AlertCollector()
     self.__config_maps = {}
+
           
-  def update_definitions(self, alert_commands, refresh_jobs=False):
+  def update_definitions(self, alert_commands, reschedule_jobs=False):
     ''' updates the persisted definitions and restarts the scheduler '''
     
     with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
       json.dump(alert_commands, f, indent=2)
     
-    if refresh_jobs:
-      self.start()
+    if reschedule_jobs:
+      self.reschedule()
+
       
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
+
     
   def start(self):
     ''' loads definitions from file and starts the scheduler '''
@@ -90,26 +93,72 @@ class AlertSchedulerHandler():
 
     alert_callables = self.__load_definitions()
       
+    # schedule each definition
     for _callable in alert_callables:
-      if self.__in_minutes:
-        self.__scheduler.add_interval_job(self.__make_function(_callable),
-          minutes=_callable.interval())
-      else:
-        self.__scheduler.add_interval_job(self.__make_function(_callable),
-          seconds=_callable.interval())
+      self.schedule_definition(_callable)
       
     logger.debug("Starting scheduler {0}; currently running: {1}".format(
       str(self.__scheduler), str(self.__scheduler.running)))
+
     self.__scheduler.start()
+
     
   def stop(self):
     if not self.__scheduler is None:
       self.__scheduler.shutdown(wait=False)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
+
+
+  def reschedule(self):
+    '''
+    Removes jobs that are scheduled where their UUID no longer is valid. 
+    Schedules jobs where the definition UUID is not currently scheduled.
+    '''
+    jobs_scheduled = 0
+    jobs_removed = 0
+    
+    definitions = self.__load_definitions()
+    scheduled_jobs = self.__scheduler.get_jobs()
+    
+    # for every scheduled job, see if its UUID is still valid
+    for scheduled_job in scheduled_jobs:
+      uuid_valid = False
+      
+      for definition in definitions:
+        definition_uuid = definition.definition_uuid()
+        if scheduled_job.name == definition_uuid:
+          uuid_valid = True
+          break
+      
+      # jobs without valid UUIDs should be unscheduled
+      if uuid_valid == False:
+        jobs_removed += 1
+        logger.info("Unscheduling {0}".format(scheduled_job.name))
+        self._collector.remove_by_uuid(scheduled_job.name)
+        self.__scheduler.unschedule_job(scheduled_job)
       
+    # for every definition, determine if there is a scheduled job
+    for definition in definitions:
+      definition_scheduled = False
+      for scheduled_job in scheduled_jobs:
+        definition_uuid = definition.definition_uuid()
+        if definition_uuid == scheduled_job.name:
+          definition_scheduled = True
+          break
+      
+      # if no jobs are found with the definitions UUID, schedule it
+      if definition_scheduled == False:
+        jobs_scheduled += 1
+        self.schedule_definition(definition)
+  
+    logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
+        str(jobs_scheduled), str(jobs_removed)))
+
+
   def collector(self):
     ''' gets the collector for reporting to the server '''
-    return self.__collector
+    return self._collector
+  
       
   def __load_definitions(self):
     ''' loads all alert commands from the file.  all clusters are stored in one file '''
@@ -147,7 +196,7 @@ class AlertSchedulerHandler():
         vals = self.__find_config_values(configmap, obj.get_lookup_keys())
         self.__config_maps[clusterName].update(vals)
 
-        obj.set_helpers(self.__collector, self.__config_maps[clusterName])
+        obj.set_helpers(self._collector, self.__config_maps[clusterName])
 
         definitions.append(obj)
       
@@ -208,7 +257,30 @@ class AlertSchedulerHandler():
         configmap = command['configurations']
         keylist = self.__config_maps[clusterName].keys()
         vals = self.__find_config_values(configmap, keylist)
-        self.__config_maps[clusterName].update(vals)  
+        self.__config_maps[clusterName].update(vals)
+        
+
+  def schedule_definition(self,definition):
+    '''
+    Schedule a definition (callable). Scheduled jobs are given the UUID
+    as their name so that they can be identified later on.
+    '''
+    job = None
+
+    if self.__in_minutes:
+      job = self.__scheduler.add_interval_job(self.__make_function(definition),
+        minutes=definition.interval())
+    else:
+      job = self.__scheduler.add_interval_job(self.__make_function(definition),
+        seconds=definition.interval())
+    
+    # although the documentation states that Job(kwargs) takes a name 
+    # key/value pair, it does not actually set the name; do it manually
+    if job is not None:
+      job.name = definition.definition_uuid()
+      
+    logger.info("Scheduling {0} with UUID {1}".format(
+      definition.definition_name(), definition.definition_uuid()))
 
 def main():
   args = list(sys.argv)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index b22938d..10dcff8 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -46,17 +46,34 @@ class BaseAlert(object):
     else:
       interval = self.alert_meta['interval']
       return 1 if interval < 1 else interval
-      
+
+
+  def definition_name(self):
+    '''
+    gets the unique name of the alert definition
+    '''
+    return self.alert_meta['name']
+
+
+  def definition_uuid(self):
+    '''
+    gets the unique has of the alert definition
+    '''
+    return self.alert_meta['uuid']
+
+
   def set_helpers(self, collector, value_dict):
     ''' sets helper objects for alerts without having to use them in a constructor '''
     self.collector = collector
     self.config_value_dict = value_dict
-      
+
+
   def set_cluster(self, cluster, host):
     ''' sets cluster information for the alert '''
     self.cluster = cluster
     self.host_name = host
-  
+
+
   def collect(self):
     ''' method used for collection.  defers to _collect() '''
     
@@ -83,12 +100,14 @@ class BaseAlert(object):
     data['service'] = self._find_value('serviceName')
     data['component'] = self._find_value('componentName')
     data['timestamp'] = long(time.time() * 1000)
+    data['uuid'] = self._find_value('uuid')
 
     if logger.isEnabledFor(logging.DEBUG):
       logger.debug("debug alert text: {0}".format(data['text']))
     
     self.collector.put(self.cluster, data)
-  
+
+
   def _find_value(self, meta_key):
     ''' safe way to get a value when outputting result json.  will not throw an exception '''
     if self.alert_meta.has_key(meta_key):
@@ -96,10 +115,12 @@ class BaseAlert(object):
     else:
       return None
 
+
   def get_lookup_keys(self):
     ''' returns a list of lookup keys found for this alert '''
     return self._lookup_keys
-      
+
+
   def _find_lookup_property(self, key):
     '''
     check if the supplied key is parameterized
@@ -112,7 +133,8 @@ class BaseAlert(object):
       return keys[0]
       
     return key
-    
+
+
   def _lookup_property_value(self, key):
     '''
     in the case of specifying a configuration path, lookup that path's value
@@ -124,7 +146,8 @@ class BaseAlert(object):
       return self.config_value_dict[key]
     else:
       return None
-  
+
+
   def _collect(self):
     '''
     Low level function to collect alert data.  The result is a tuple as:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
index 7249449..9943211 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
@@ -33,8 +33,27 @@ class AlertCollector():
     if not cluster in self.__buckets:
       self.__buckets[cluster] = {}
       
-    self.__buckets[cluster][alert['name']] = alert 
+    self.__buckets[cluster][alert['name']] = alert
     
+  def remove(self, cluster, alert_name):
+    '''
+    Removes the alert with the specified name if it exists in the dictionary
+    '''
+    if not cluster in self.__buckets:
+      return
+    
+    del self.__buckets[cluster][alert_name]
+    
+  def remove_by_uuid(self, alert_uuid):
+    '''
+    Removes the alert with the specified uuid if it exists in the dictionary
+    '''
+    for cluster,alert_map in self.__buckets.iteritems():
+      for alert_name in alert_map.keys():
+        alert = alert_map[alert_name]
+        if alert['uuid'] == alert_uuid:
+          self.remove(cluster, alert_name)
+        
   def alerts(self):
     alerts = []
     for clustermap in self.__buckets.values()[:]:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 5dc45b5..662d8ee 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -56,6 +56,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "interval": 6,
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
         "type": "PORT",
         "uri": "{{hdfs-site/my-key}}",
@@ -86,6 +87,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "interval": 6,
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
         "type": "PORT",
         "uri": "http://c6401.ambari.apache.org",
@@ -115,6 +117,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "interval": 6,
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
         "type": "SCRIPT",
         "path": "test_script.py",
@@ -152,6 +155,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "interval": 6,
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
         "type": "METRIC",
         "uri": "http://myurl:8633",
@@ -197,3 +201,49 @@ class TestAlerts(TestCase):
     self.assertEquals('OK', collector.alerts()[0]['state'])
     self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
     
+  def test_reschedule(self):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path)
+    ash.start()
+    ash.reschedule()
+        
+  
+  def test_alert_collector_purge(self):
+    json = { "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
+      "source": {
+        "type": "PORT",
+        "uri": "{{hdfs-site/my-key}}",
+        "default_port": 50070,
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    }
+
+    collector = AlertCollector()
+
+    pa = PortAlert(json, json['source'])
+    pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
+    self.assertEquals(6, pa.interval())
+
+    res = pa.collect()
+    
+    self.assertIsNotNone(collector.alerts()[0])
+    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
+    
+    collector.remove_by_uuid('c1f73191-4481-4435-8dae-fd380e4c0be1')
+    self.assertEquals(0,len(collector.alerts()))
+    

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
index ad52abc..30973c2 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
@@ -16,6 +16,7 @@
         "label": "NameNode process",
         "interval": 6,
         "scope": "host",
+        "uuid": "3f82ae27-fa6a-465b-b77d-67963ac55d2f",
         "source": {
           "type": "PORT",
           "uri": "{{hdfs-site/dfs.namenode.http-address}}",

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-project/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-project/pom.xml b/ambari-project/pom.xml
index 58e2e8f..edba1dc 100644
--- a/ambari-project/pom.xml
+++ b/ambari-project/pom.xml
@@ -55,8 +55,9 @@
   </pluginRepositories>
   <repositories>
     <repository>
-      <id>EclipseLink</id>
-      <url>http://download.eclipse.org/rt/eclipselink/maven.repo</url>
+      <id>oss.sonatype.org</id>
+      <name>OSS Sonatype Staging</name>
+      <url>https://oss.sonatype.org/content/groups/staging</url>
     </repository>
     <repository>
       <id>spring-milestones</id>
@@ -211,7 +212,7 @@
       <dependency>
         <groupId>org.eclipse.persistence</groupId>
         <artifactId>eclipselink</artifactId>
-        <version>2.4.0</version>
+        <version>2.4.2</version>
       </dependency>
       <dependency>
         <groupId>org.postgresql</groupId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 5ac76f9..f37a177 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -191,7 +191,7 @@
           <dependency>
             <groupId>org.eclipse.persistence</groupId>
             <artifactId>eclipselink</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.2</version>
           </dependency>
         </dependencies>
       </plugin>
@@ -1451,14 +1451,12 @@
       <version>1.5.2</version>
     </dependency>
   </dependencies>
-  <!--<reporting> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId>
-    <artifactId>findbugs-maven-plugin</artifactId> <version>2.5.2</version> </plugin>
-    </plugins> </reporting> -->
-
+  
   <pluginRepositories>
     <pluginRepository>
-      <id>EclipseLink</id>
-      <url>http://download.eclipse.org/rt/eclipselink/maven.repo</url>
+      <id>oss.sonatype.org</id>
+      <name>OSS Sonatype Staging</name>
+      <url>https://oss.sonatype.org/content/groups/staging</url>
     </pluginRepository>
   </pluginRepositories>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java
index fc51ddd..8c55aa7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java
@@ -314,10 +314,6 @@ public class AlertGroupResourceProvider extends
       Collection<Long> targetIds = (Collection<Long>) requestMap.get(ALERT_GROUP_TARGETS);
       Collection<Long> definitionIds = (Collection<Long>) requestMap.get(ALERT_GROUP_DEFINITIONS);
 
-      if (!StringUtils.isBlank(name)) {
-        entity.setGroupName(name);
-      }
-
       // if targets were supplied, replace existing
       Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>();
       if (null != targetIds && targetIds.size() > 0) {
@@ -331,17 +327,26 @@ public class AlertGroupResourceProvider extends
         entity.setAlertTargets(targets);
       }
 
-      // if definitions were supplied, replace existing
-      Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
-      if (null != definitionIds && definitionIds.size() > 0) {
-        List<Long> ids = new ArrayList<Long>(definitionIds.size());
-        ids.addAll(definitionIds);
-        definitions.addAll(s_definitionDao.findByIds(ids));
+      // only the targets should be updatable on default groups; everything
+      // else is valid only on regular groups
+      if (!entity.isDefault()) {
+        // set the name if supplied
+        if (!StringUtils.isBlank(name)) {
+          entity.setGroupName(name);
+        }
 
-        entity.setAlertDefinitions(definitions);
-      } else if (definitionIds.size() == 0) {
-        // empty array supplied, clear out existing definitions
-        entity.setAlertDefinitions(definitions);
+        // if definitions were supplied, replace existing
+        Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
+        if (null != definitionIds && definitionIds.size() > 0) {
+          List<Long> ids = new ArrayList<Long>(definitionIds.size());
+          ids.addAll(definitionIds);
+          definitions.addAll(s_definitionDao.findByIds(ids));
+
+          entity.setAlertDefinitions(definitions);
+        } else {
+          // empty array supplied, clear out existing definitions
+          entity.setAlertDefinitions(definitions);
+        }
       }
 
       s_dao.merge(entity);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java
new file mode 100644
index 0000000..c8fd36b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.alert.AlertDefinition;
+
+/**
+ * The {@link AlertDefinitionDeleteEvent} is used to represent that an
+ * {@link AlertDefinition} has been removed from the system.
+ */
+public class AlertDefinitionDeleteEvent extends ClusterEvent {
+
+  /**
+   * The removed alert defintiion
+   */
+  private final AlertDefinition m_definition;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is in.
+   * @param definition
+   *          the alert definition being registered.
+   */
+  public AlertDefinitionDeleteEvent(
+      long clusterId, AlertDefinition definition) {
+    super(AmbariEventType.ALERT_DEFINITION_REMOVAL, clusterId);
+    m_definition = definition;
+  }
+
+  /**
+   * Get the registered alert definition.
+   *
+   * @return the alert definition (not {@code null}).
+   */
+  public AlertDefinition getDefinition() {
+    return m_definition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
index 6cf752e..3dae676 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
@@ -35,7 +35,12 @@ public abstract class AmbariEvent {
     /**
      * An alert definition is registered with the system.
      */
-    ALERT_DEFINITION_REGISTRATION;
+    ALERT_DEFINITION_REGISTRATION,
+
+    /**
+     * An alert definition is removed from the system.
+     */
+    ALERT_DEFINITION_REMOVAL;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
index 43d4b35..9d2f4c6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
@@ -17,15 +17,22 @@
  */
 package org.apache.ambari.server.events.listeners;
 
+import java.util.Set;
+
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
 import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
 /**
@@ -34,6 +41,10 @@ import com.google.inject.Singleton;
  */
 @Singleton
 public class AlertLifecycleListener {
+  /**
+   * Logger.
+   */
+  private static Logger LOG = LoggerFactory.getLogger(AlertLifecycleListener.class);
 
   /**
    * Used for quick lookups of aggregate alerts.
@@ -42,6 +53,13 @@ public class AlertLifecycleListener {
   private AggregateDefinitionMapping m_aggregateMapping;
 
   /**
+   * Invalidates hosts so that they can receive updated alert definition
+   * commands.
+   */
+  @Inject
+  private Provider<AlertDefinitionHash> m_alertDefinitionHash;
+
+  /**
    * Constructor.
    *
    * @param publisher
@@ -66,8 +84,42 @@ public class AlertLifecycleListener {
   public void onAmbariEvent(AlertDefinitionRegistrationEvent event) {
     AlertDefinition definition = event.getDefinition();
 
+    LOG.debug("Registering alert definition {}", definition);
+
     if (definition.getSource().getType() == SourceType.AGGREGATE) {
-      m_aggregateMapping.addAggregateType(event.getClusterId(), definition);
+      m_aggregateMapping.registerAggregate(event.getClusterId(), definition);
+    }
+  }
+
+  /**
+   * Handles {@link AlertDefinitionDeleteEvent} by performing the following
+   * tasks:
+   * <ul>
+   * <li>Removal from with {@link AggregateDefinitionMapping}</li>
+   * <li>{@link AlertDefinitionHash} invalidation</li>
+   * </ul>
+   *
+   * @param event
+   *          the event being handled.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAmbariEvent(AlertDefinitionDeleteEvent event) {
+    AlertDefinition definition = event.getDefinition();
+
+    LOG.debug("Removing alert definition {}", definition);
+
+    if (null == definition) {
+      return;
     }
+
+    m_aggregateMapping.removeAssociatedAggregate(event.getClusterId(),
+        definition.getName());
+
+    AlertDefinitionHash hashHelper = m_alertDefinitionHash.get();
+    Set<String> invalidatedHosts = hashHelper.invalidateHosts(definition);
+
+    hashHelper.enqueueAgentCommands(definition.getClusterId(),
+        invalidatedHosts);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
index e87ba7d..78d8e66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
@@ -101,6 +101,14 @@ public class AlertReceivedListener {
       AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
           alert.getName());
 
+      if (null == definition) {
+        LOG.warn(
+            "Received an alert for {} which is a definition that does not exist anymore",
+            alert.getName());
+
+        return;
+      }
+
       AlertHistoryEntity history = createHistory(clusterId, definition, alert);
 
       current = new AlertCurrentEntity();

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
index 075ee04..8e8c808 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
@@ -25,6 +25,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.controller.RootServiceResponseFactory;
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
@@ -364,6 +365,18 @@ public class AlertDefinitionDAO {
     if (null != alertDefinition) {
       entityManager.remove(alertDefinition);
     }
+
+    // publish the alert definition removal
+    AlertDefinition coerced = alertDefinitionFactory.coerce(alertDefinition);
+    if (null != coerced) {
+      AlertDefinitionDeleteEvent event = new AlertDefinitionDeleteEvent(
+          alertDefinition.getClusterId(), coerced);
+
+      eventPublisher.publish(event);
+    } else {
+      LOG.warn("Unable to broadcast alert removal event for {}",
+          alertDefinition.getDefinitionName());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index a28b448..f2161f3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -515,6 +515,7 @@ public class AlertsDAO {
     query.setParameter("clusterId", Long.valueOf(clusterId));
     query.setParameter("definitionName", alertName);
 
+    query = setQueryRefreshHint(query);
     return daoUtils.selectOne(query);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
index 8548fda..4ab2c80 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
@@ -17,7 +17,9 @@
  */
 package org.apache.ambari.server.orm.entities;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 import javax.persistence.Basic;
@@ -363,7 +365,7 @@ public class AlertDefinitionEntity {
    * @return the groups, or {@code null} if none.
    */
   public Set<AlertGroupEntity> getAlertGroups() {
-    return alertGroups;
+    return Collections.unmodifiableSet(alertGroups);
   }
 
   /**
@@ -418,7 +420,7 @@ public class AlertDefinitionEntity {
    * @param alertGroup
    */
   protected void removeAlertGroup(AlertGroupEntity alertGroup) {
-    if (null != alertGroups) {
+    if (null != alertGroups && alertGroups.contains(alertGroup)) {
       alertGroups.remove(alertGroup);
     }
   }
@@ -429,12 +431,15 @@ public class AlertDefinitionEntity {
    */
   @PreRemove
   public void preRemove() {
-    Set<AlertGroupEntity> groups = getAlertGroups();
-    if (null == groups || groups.size() == 0) {
+    if (null == alertGroups || alertGroups.size() == 0) {
       return;
     }
 
-    for (AlertGroupEntity group : groups) {
+    Iterator<AlertGroupEntity> iterator = alertGroups.iterator();
+    while (iterator.hasNext()) {
+      AlertGroupEntity group = iterator.next();
+      iterator.remove();
+
       group.removeAlertDefinition(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
index 1cc9bcc..ac3586d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
@@ -73,14 +74,14 @@ public class AlertGroupEntity {
   /**
    * Bi-directional many-to-many association to {@link AlertDefinitionEntity}
    */
-  @ManyToMany
+  @ManyToMany(cascade = CascadeType.MERGE)
   @JoinTable(name = "alert_grouping", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "definition_id", nullable = false) })
   private Set<AlertDefinitionEntity> alertDefinitions;
 
   /**
    * Unidirectional many-to-many association to {@link AlertTargetEntity}
    */
-  @ManyToMany
+  @ManyToMany(cascade = CascadeType.MERGE)
   @JoinTable(name = "alert_group_target", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "target_id", nullable = false) })
   private Set<AlertTargetEntity> alertTargets;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
index 04f20f9..c63c0d4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.state.alert;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.inject.Singleton;
 
@@ -33,7 +34,8 @@ public final class AggregateDefinitionMapping {
    * In-memory mapping of cluster ID to definition name / aggregate definition.
    * This is used for fast lookups when receiving events.
    */
-  private Map<Long, Map<String, AlertDefinition>> m_aggregateMap = new HashMap<Long, Map<String, AlertDefinition>>();
+  private Map<Long, Map<String, AlertDefinition>> m_aggregateMap =
+      new ConcurrentHashMap<Long, Map<String, AlertDefinition>>();
 
   /**
    * Constructor.
@@ -69,10 +71,10 @@ public final class AggregateDefinitionMapping {
    *
    * @param clusterId
    *          the ID of the cluster that the definition is bound to.
-   * @param name
-   *          the unique name of the definition.
+   * @param definition
+   *          the aggregate definition to register (not {@code null}).
    */
-  public void addAggregateType(long clusterId, AlertDefinition definition) {
+  public void registerAggregate(long clusterId, AlertDefinition definition) {
     Long id = Long.valueOf(clusterId);
 
     if (!m_aggregateMap.containsKey(id)) {
@@ -85,4 +87,25 @@ public final class AggregateDefinitionMapping {
 
     map.put(as.getAlertName(), definition);
   }
+
+  /**
+   * Removes the associated aggregate for the specified aggregated definition.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is bound to.
+   * @param name
+   *          the unique name of the definition for which aggregates should be
+   *          unassociated (not {@code null}).
+   */
+  public void removeAssociatedAggregate(long clusterId,
+      String aggregatedDefinitonName) {
+    Long id = Long.valueOf(clusterId);
+
+    if (!m_aggregateMap.containsKey(id)) {
+      return;
+    }
+
+    Map<String, AlertDefinition> map = m_aggregateMap.get(id);
+    map.remove(aggregatedDefinitonName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
index 5058e91..961fb66 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
  */
 public class AlertDefinition {
 
+  private long clusterId;
   private String serviceName = null;
   private String componentName = null;
 
@@ -46,6 +47,24 @@ public class AlertDefinition {
   private String uuid = null;
 
   /**
+   * Gets the cluster ID for this definition.
+   *
+   * @return
+   */
+  public long getClusterId() {
+    return clusterId;
+  }
+
+  /**
+   * Sets the cluster ID for this definition.
+   *
+   * @param clusterId
+   */
+  public void setClusterId(long clusterId) {
+    this.clusterId = clusterId;
+  }
+
+  /**
    * @return the service name
    */
   public String getServiceName() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
index 4f6a9a3..9c72b6a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
@@ -125,6 +125,7 @@ public class AlertDefinitionFactory {
     }
 
     AlertDefinition definition = new AlertDefinition();
+    definition.setClusterId(entity.getClusterId());
     definition.setComponentName(entity.getComponentName());
     definition.setEnabled(entity.getEnabled());
     definition.setInterval(entity.getScheduleInterval());

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
index 9ea039c..35d7742 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
@@ -262,85 +262,19 @@ public class AlertDefinitionHash {
 
 
   /**
-   * Gets the alert definition entities for the specified host. This will include the
-   * following types of alert definitions:
-   * <ul>
-   * <li>Service/Component alerts</li>
-   * <li>Service alerts where the host is a MASTER</li>
-   * <li>Host alerts that are not bound to a service</li>
-   * </ul>
+   * Invalidate the hashes of any host that would be affected by the specified
+   * definition.
    *
-   * @param clusterName
-   *          the cluster name (not {@code null}).
-   * @param hostName
-   *          the host name (not {@code null}).
-   * @return the alert definitions for the host, or an empty set (never
+   * @param definition
+   *          the definition to use to find the hosts to invlidate (not
+   *          {@code null}).
+   * @return the hosts that were invalidated, or an empty set (never
    *         {@code null}).
    */
-  private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
-      String clusterName,
-      String hostName) {
-    Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
-
-    try {
-      Cluster cluster = m_clusters.getCluster(clusterName);
-      if (null == cluster) {
-        LOG.warn("Unable to get alert definitions for the missing cluster {}",
-            clusterName);
-
-        return Collections.emptySet();
-      }
-
-      long clusterId = cluster.getClusterId();
-      List<ServiceComponentHost> serviceComponents = cluster.getServiceComponentHosts(hostName);
-      if (null == serviceComponents || serviceComponents.size() == 0) {
-        LOG.warn(
-            "Unable to get alert definitions for {} since there are no service components defined",
-            hostName);
-
-        return Collections.emptySet();
-      }
-
-      for (ServiceComponentHost serviceComponent : serviceComponents) {
-        String serviceName = serviceComponent.getServiceName();
-        String componentName = serviceComponent.getServiceComponentName();
-
-        // add all alerts for this service/component pair
-        definitions.addAll(m_definitionDao.findByServiceComponent(
-            clusterId, serviceName, componentName));
-      }
-
-      // for every service, get the master components and see if the host
-      // is a master
-      Set<String> services = new HashSet<String>();
-      for (Entry<String, Service> entry : cluster.getServices().entrySet()) {
-        Service service = entry.getValue();
-        Map<String, ServiceComponent> components = service.getServiceComponents();
-        for (Entry<String, ServiceComponent> component : components.entrySet()) {
-          if (component.getValue().isMasterComponent()) {
-            Map<String, ServiceComponentHost> hosts = component.getValue().getServiceComponentHosts();
-
-            if( hosts.containsKey( hostName ) ){
-              services.add(service.getName());
-            }
-          }
-        }
-      }
-
-      // add all service scoped alerts
-      if( services.size() > 0 ){
-        definitions.addAll(m_definitionDao.findByServiceMaster(clusterId,
-            services));
-      }
-
-      // add any alerts not bound to a service (host level alerts)
-      definitions.addAll(m_definitionDao.findAgentScoped(clusterId));
-    } catch (AmbariException ambariException) {
-      LOG.error("Unable to get alert definitions", ambariException);
-      return Collections.emptySet();
-    }
-
-    return definitions;
+  public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
+    return invalidateHosts(definition.getClusterId(),
+        definition.getDefinitionName(), definition.getServiceName(),
+        definition.getComponentName());
   }
 
   /**
@@ -353,8 +287,28 @@ public class AlertDefinitionHash {
    * @return the hosts that were invalidated, or an empty set (never
    *         {@code null}).
    */
-  public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
-    long clusterId = definition.getClusterId();
+  public Set<String> invalidateHosts(AlertDefinition definition) {
+    return invalidateHosts(definition.getClusterId(), definition.getName(),
+        definition.getServiceName(), definition.getComponentName());
+  }
+
+  /**
+   * Invalidate the hashes of any host that would be affected by the specified
+   * definition.
+   *
+   * @param clusterId
+   *          the cluster ID
+   * @param definitionName
+   *          the definition unique name.
+   * @param definitionServiceName
+   *          the definition's service name.
+   * @param definitionComponentName
+   *          the definition's component name.
+   * @return the hosts that were invalidated, or an empty set (never
+   *         {@code null}).
+   */
+  public Set<String> invalidateHosts(long clusterId, String definitionName,
+      String definitionServiceName, String definitionComponentName) {
     Set<String> invalidatedHosts = new HashSet<String>();
 
     Cluster cluster = null;
@@ -379,8 +333,6 @@ public class AlertDefinitionHash {
     }
 
     // intercept host agent alerts; they affect all hosts
-    String definitionServiceName = definition.getServiceName();
-    String definitionComponentName = definition.getComponentName();
     if (Services.AMBARI.equals(definitionServiceName)
         && Components.AMBARI_AGENT.equals(definitionComponentName)) {
 
@@ -413,7 +365,7 @@ public class AlertDefinitionHash {
     Service service = services.get(definitionServiceName);
     if (null == service) {
       LOG.warn("The alert definition {} has an unknown service of {}",
-          definition.getDefinitionName(), definitionServiceName);
+          definitionName, definitionServiceName);
 
       return invalidatedHosts;
     }
@@ -439,6 +391,82 @@ public class AlertDefinitionHash {
   }
 
   /**
+   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
+   * they will receive a payload of alert definitions that they should be
+   * running.
+   * <p/>
+   * This method is typically called after
+   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
+   * invalidation of the alert definition hash.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hosts
+   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   */
+  public void enqueueAgentCommands(long clusterId, Set<String> hosts) {
+    String clusterName = null;
+
+    try {
+      Cluster cluster = m_clusters.getClusterById(clusterId);
+      clusterName = cluster.getClusterName();
+    } catch (AmbariException ae) {
+      LOG.error("Unable to lookup cluster for alert definition commands", ae);
+    }
+
+    enqueueAgentCommands(clusterName, hosts);
+  }
+
+  /**
+   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
+   * they will receive a payload of alert definitions that they should be
+   * running.
+   * <p/>
+   * This method is typically called after
+   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
+   * invalidation of the alert definition hash.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hosts
+   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   */
+  public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
+    if (null == clusterName) {
+      LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
+      return;
+    }
+
+    if (null == hosts || hosts.size() == 0) {
+      return;
+    }
+
+    for (String hostName : hosts) {
+      List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
+          hostName);
+
+      String hash = getHash(clusterName, hostName);
+
+      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
+          hostName, hash, definitions);
+
+      try {
+        Cluster cluster = m_clusters.getCluster(clusterName);
+        command.addConfigs(m_configHelper.get(), cluster);
+      } catch (AmbariException ae) {
+        LOG.warn("Unable to add configurations to alert definition command", ae);
+      }
+
+      // unlike other commands, the alert definitions commands are really
+      // designed to be 1:1 per change; if multiple invalidations happened
+      // before the next heartbeat, there would be several commands that would
+      // force the agents to reschedule their alerts more than once
+      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
+      m_actionQueue.enqueue(hostName, command);
+    }
+  }
+
+  /**
    * Calculates a unique hash value representing all of the alert definitions
    * that should be scheduled to run on a given host. Alerts of type
    * {@link SourceType#AGGREGATE} are not included in the hash since they are
@@ -492,51 +520,84 @@ public class AlertDefinitionHash {
   }
 
   /**
-   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
-   * they will receive a payload of alert definitions that they should be
-   * running.
-   * <p/>
-   * This method is typically called after
-   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
-   * invalidation of the alert definition hash.
+   * Gets the alert definition entities for the specified host. This will include the
+   * following types of alert definitions:
+   * <ul>
+   * <li>Service/Component alerts</li>
+   * <li>Service alerts where the host is a MASTER</li>
+   * <li>Host alerts that are not bound to a service</li>
+   * </ul>
    *
    * @param clusterName
-   *          the name of the cluster (not {@code null}).
-   * @param hosts
-   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   *          the cluster name (not {@code null}).
+   * @param hostName
+   *          the host name (not {@code null}).
+   * @return the alert definitions for the host, or an empty set (never
+   *         {@code null}).
    */
-  public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
-    if (null == clusterName) {
-      LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
-      return;
-    }
+  private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
+      String clusterName,
+      String hostName) {
+    Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
 
-    if (null == hosts || hosts.size() == 0) {
-      return;
-    }
+    try {
+      Cluster cluster = m_clusters.getCluster(clusterName);
+      if (null == cluster) {
+        LOG.warn("Unable to get alert definitions for the missing cluster {}",
+            clusterName);
 
-    for (String hostName : hosts) {
-      List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
-          hostName);
+        return Collections.emptySet();
+      }
 
-      String hash = getHash(clusterName, hostName);
+      long clusterId = cluster.getClusterId();
+      List<ServiceComponentHost> serviceComponents = cluster.getServiceComponentHosts(hostName);
+      if (null == serviceComponents || serviceComponents.size() == 0) {
+        LOG.warn(
+            "Unable to get alert definitions for {} since there are no service components defined",
+            hostName);
 
-      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
-          hostName, hash, definitions);
+        return Collections.emptySet();
+      }
 
-      try {
-        Cluster cluster = m_clusters.getCluster(clusterName);
-        command.addConfigs(m_configHelper.get(), cluster);
-      } catch (AmbariException ae) {
-        LOG.warn("Unable to add configurations to alert definition command", ae);
+      for (ServiceComponentHost serviceComponent : serviceComponents) {
+        String serviceName = serviceComponent.getServiceName();
+        String componentName = serviceComponent.getServiceComponentName();
+
+        // add all alerts for this service/component pair
+        definitions.addAll(m_definitionDao.findByServiceComponent(
+            clusterId, serviceName, componentName));
       }
 
-      // unlike other commands, the alert definitions commands are really
-      // designed to be 1:1 per change; if multiple invalidations happened
-      // before the next heartbeat, there would be several commands that would
-      // force the agents to reschedule their alerts more than once
-      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
-      m_actionQueue.enqueue(hostName, command);
+      // for every service, get the master components and see if the host
+      // is a master
+      Set<String> services = new HashSet<String>();
+      for (Entry<String, Service> entry : cluster.getServices().entrySet()) {
+        Service service = entry.getValue();
+        Map<String, ServiceComponent> components = service.getServiceComponents();
+        for (Entry<String, ServiceComponent> component : components.entrySet()) {
+          if (component.getValue().isMasterComponent()) {
+            Map<String, ServiceComponentHost> hosts = component.getValue().getServiceComponentHosts();
+
+            if( hosts.containsKey( hostName ) ){
+              services.add(service.getName());
+            }
+          }
+        }
+      }
+
+      // add all service scoped alerts
+      if( services.size() > 0 ){
+        definitions.addAll(m_definitionDao.findByServiceMaster(clusterId,
+            services));
+      }
+
+      // add any alerts not bound to a service (host level alerts)
+      definitions.addAll(m_definitionDao.findAgentScoped(clusterId));
+    } catch (AmbariException ambariException) {
+      LOG.error("Unable to get alert definitions", ambariException);
+      return Collections.emptySet();
     }
+
+    return definitions;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
index 5583617..771bf8a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
@@ -383,6 +383,93 @@ public class AlertGroupResourceProviderTest {
   }
 
   /**
+   * Tests that updating a default group doesn't change read-only properties
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testUpdateDefaultGroup() throws Exception {
+    Capture<AlertGroupEntity> entityCapture = new Capture<AlertGroupEntity>();
+
+    // the definition IDs to associate with the group
+    List<Long> definitionIds = new ArrayList<Long>();
+    definitionIds.add(ALERT_DEF_ID);
+
+    // the target IDs to associate with the group
+    List<Long> targetIds = new ArrayList<Long>();
+    targetIds.add(ALERT_TARGET_ID);
+
+    // definition entities to return from DAO
+    List<AlertDefinitionEntity> definitionEntities = new ArrayList<AlertDefinitionEntity>();
+    definitionEntities.addAll(getMockDefinitions());
+
+    // target entities to return from DAO
+    List<AlertTargetEntity> newTargetEntities = new ArrayList<AlertTargetEntity>();
+    newTargetEntities.addAll(getMockTargets());
+
+    Set<AlertTargetEntity> mockTargets2 = getMockTargets();
+    AlertTargetEntity target2 = mockTargets2.iterator().next();
+    target2.setTargetId(29L);
+
+    newTargetEntities.add(target2);
+
+    AlertGroupEntity group = new AlertGroupEntity();
+    group.setDefault(true);
+    group.setGroupName(ALERT_GROUP_NAME);
+    group.setAlertDefinitions(getMockDefinitions());
+    group.setAlertTargets(getMockTargets());
+
+    expect(m_dao.findGroupById(ALERT_GROUP_ID)).andReturn(group).times(1);
+    expect(m_dao.merge(capture(entityCapture))).andReturn(group).once();
+
+    // expect target entity lookup for association
+    List<Long> newTargets = Arrays.asList(28L, 29L);
+    expect(m_dao.findTargetsById(EasyMock.eq(newTargets))).andReturn(
+        newTargetEntities).once();
+
+    replay(m_dao, m_definitionDao);
+
+    AlertGroupResourceProvider provider = createProvider(m_amc);
+
+    // create new properties, and include the ID since we're not going through
+    // a service layer which would add it for us automatically
+    Map<String, Object> requestProps = new HashMap<String, Object>();
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_ID,
+        ALERT_GROUP_ID.toString());
+
+    // try to change the name (it should not work)
+    String newName = ALERT_GROUP_NAME + " Foo";
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_NAME, newName);
+
+    // try to change the definitions (it should not work)
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_DEFINITIONS,
+        new ArrayList<Long>());
+
+    // try to change the targets (it should work)
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_TARGETS,
+        newTargets);
+
+    Predicate predicate = new PredicateBuilder().property(
+        AlertGroupResourceProvider.ALERT_GROUP_CLUSTER_NAME).equals(
+        ALERT_GROUP_CLUSTER_NAME).and().property(
+        AlertGroupResourceProvider.ALERT_GROUP_ID).equals(
+        ALERT_GROUP_ID.toString()).toPredicate();
+
+    Request request = PropertyHelper.getUpdateRequest(requestProps, null);
+    provider.updateResources(request, predicate);
+
+    assertTrue(entityCapture.hasCaptured());
+
+    AlertGroupEntity entity = entityCapture.getValue();
+    assertEquals(ALERT_GROUP_NAME, entity.getGroupName());
+    assertEquals(2, entity.getAlertTargets().size());
+    assertEquals(1, entity.getAlertDefinitions().size());
+
+    verify(m_dao, m_definitionDao);
+  }
+
+  /**
    * @throws Exception
    */
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
index 4d182cc..a1cd063 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
@@ -118,7 +118,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScope(Scope.SERVICE);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
@@ -133,7 +133,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
@@ -148,7 +148,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java
index fd18134..1ee1224 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java
@@ -72,6 +72,7 @@ public class AlertDefinitionEqualityTest extends TestCase {
    */
   private AlertDefinition getAlertDefinition(SourceType sourceType) {
     AlertDefinition definition = new AlertDefinition();
+    definition.setClusterId(1);
     definition.setComponentName("component");
     definition.setEnabled(true);
     definition.setInterval(1);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
index 482b5ef..7309851 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
@@ -17,13 +17,20 @@
  */
 package org.apache.ambari.server.state.alerts;
 
+import java.lang.reflect.Field;
+
 import junit.framework.Assert;
 
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AmbariEvent;
+import org.apache.ambari.server.events.listeners.AlertLifecycleListener;
 import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
@@ -33,10 +40,15 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Scope;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
@@ -55,6 +67,9 @@ public class AlertEventPublisherTest {
   private Injector injector;
   private ServiceFactory serviceFactory;
   private AmbariMetaInfo metaInfo;
+  private OrmTestHelper ormHelper;
+  private AggregateDefinitionMapping aggregateMapping;
+  private AmbariEventPublisher eventPublisher;
 
   /**
    *
@@ -64,13 +79,25 @@ public class AlertEventPublisherTest {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
 
+    eventPublisher = injector.getInstance(AmbariEventPublisher.class);
+    EventBus synchronizedBus = new EventBus();
+
     // force singleton init via Guice so the listener registers with the bus
-    injector.getInstance(AlertServiceStateListener.class);
+    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+
+    // !!! need a synchronous op for testing
+    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(eventPublisher, synchronizedBus);
 
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     definitionDao = injector.getInstance(AlertDefinitionDAO.class);
     clusters = injector.getInstance(Clusters.class);
     serviceFactory = injector.getInstance(ServiceFactory.class);
+    ormHelper = injector.getInstance(OrmTestHelper.class);
+    aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
 
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
     metaInfo.init();
@@ -107,7 +134,7 @@ public class AlertEventPublisherTest {
   /**
    * Tests that all {@link AlertDefinitionEntity} instances are created for the
    * installed service.
-   * 
+   *
    * @throws Exception
    */
   @Test
@@ -118,6 +145,43 @@ public class AlertEventPublisherTest {
   }
 
   /**
+   * Tests that {@link AlertDefinitionDeleteEvent} instances are fired when a
+   * definition is removed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAlertDefinitionRemoval() throws Exception {
+    Assert.assertEquals(0, definitionDao.findAll().size());
+    AlertDefinitionEntity definition = ormHelper.createAlertDefinition(1L);
+    Assert.assertEquals(1, definitionDao.findAll().size());
+
+    AggregateSource source = new AggregateSource();
+    source.setAlertName(definition.getDefinitionName());
+
+    AlertDefinition aggregate = new AlertDefinition();
+    aggregate.setClusterId(1L);
+    aggregate.setComponentName("DATANODE");
+    aggregate.setEnabled(true);
+    aggregate.setInterval(1);
+    aggregate.setLabel("DataNode Aggregate");
+    aggregate.setName("datanode_aggregate");
+    aggregate.setScope(Scope.ANY);
+    aggregate.setServiceName("HDFS");
+    aggregate.setSource(source);
+    aggregate.setUuid("uuid");
+
+    aggregateMapping.registerAggregate(1L, aggregate);
+    Assert.assertNotNull(aggregateMapping.getAggregateDefinition(1L,
+        source.getAlertName()));
+
+    definitionDao.remove(definition);
+
+    Assert.assertNull(aggregateMapping.getAggregateDefinition(1L,
+        source.getAlertName()));
+  }
+
+  /**
    * Calls {@link Service#persist()} to mock a service install.
    */
   private void installHdfsService() throws Exception {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
index 312f297..18b4123 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.state.alerts;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -39,6 +40,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -72,6 +74,14 @@ public class AlertStateChangedEventTest {
 
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     eventPublisher = injector.getInstance(AlertEventPublisher.class);
+
+    EventBus synchronizedBus = new EventBus();
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+
+    // !!! need a synchronous op for testing
+    Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(eventPublisher, synchronizedBus);
   }
 
   /**
@@ -99,8 +109,6 @@ public class AlertStateChangedEventTest {
 
     // async publishing
     eventPublisher.publish(event);
-    Thread.sleep(2000);
-
     EasyMock.verify(dispatchDao, history, event);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
index 85a6e9b..502c3b9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
@@ -373,7 +373,7 @@ public class AlertDataManagerTest {
     AggregateDefinitionMapping aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
 
     AlertDefinition aggregateDefinition = factory.coerce(aggDef);
-    aggregateMapping.addAggregateType(clusterId.longValue(),
+    aggregateMapping.registerAggregate(clusterId.longValue(),
         aggregateDefinition );
 
     AggregateSource as = (AggregateSource) aggregateDefinition.getSource();


Mime
View raw message