Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B80B17B50 for ; Fri, 26 Sep 2014 16:47:40 +0000 (UTC) Received: (qmail 75952 invoked by uid 500); 26 Sep 2014 16:47:40 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 75920 invoked by uid 500); 26 Sep 2014 16:47:39 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 75911 invoked by uid 99); 26 Sep 2014 16:47:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Sep 2014 16:47:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9FFC79A7DAE; Fri, 26 Sep 2014 16:47:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Message-Id: <4916e4d8ff624acab0f0cf81493f4815@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: AMBARI-7508 - Alerts: Reschedule Individual Alerts on Agents (jonathanhurley) Date: Fri, 26 Sep 2014 16:47:39 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/branch-alerts-dev 24483521f -> 7db82c923 AMBARI-7508 - Alerts: Reschedule Individual Alerts on Agents (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7db82c92 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7db82c92 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7db82c92 Branch: refs/heads/branch-alerts-dev Commit: 7db82c923c2903a2e2fab3d302fb9337423da08b Parents: 2448352 Author: Jonathan Hurley Authored: Thu Sep 25 22:19:49 2014 -0700 Committer: Jonathan Hurley Committed: Fri Sep 26 09:38:10 2014 -0700 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 98 ++++++- .../python/ambari_agent/alerts/base_alert.py | 37 ++- .../python/ambari_agent/alerts/collector.py | 21 +- .../src/test/python/ambari_agent/TestAlerts.py | 50 ++++ .../ambari_agent/dummy_files/definitions.json | 1 + ambari-project/pom.xml | 7 +- ambari-server/pom.xml | 12 +- .../internal/AlertGroupResourceProvider.java | 33 ++- .../events/AlertDefinitionDeleteEvent.java | 55 ++++ .../ambari/server/events/AmbariEvent.java | 7 +- .../listeners/AlertLifecycleListener.java | 54 +++- .../events/listeners/AlertReceivedListener.java | 8 + .../server/orm/dao/AlertDefinitionDAO.java | 13 + .../apache/ambari/server/orm/dao/AlertsDAO.java | 1 + .../orm/entities/AlertDefinitionEntity.java | 15 +- .../server/orm/entities/AlertGroupEntity.java | 5 +- .../state/alert/AggregateDefinitionMapping.java | 31 +- .../server/state/alert/AlertDefinition.java | 19 ++ .../state/alert/AlertDefinitionFactory.java | 1 + .../server/state/alert/AlertDefinitionHash.java | 293 +++++++++++-------- .../AlertGroupResourceProviderTest.java | 87 ++++++ .../server/orm/dao/AlertDefinitionDAOTest.java | 6 +- .../alerts/AlertDefinitionEqualityTest.java | 1 + .../state/alerts/AlertEventPublisherTest.java | 68 ++++- .../alerts/AlertStateChangedEventTest.java | 12 +- .../state/cluster/AlertDataManagerTest.java | 2 +- 26 files changed, 755 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index 7cce533..8dcce50 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -61,22 +61,25 @@ class AlertSchedulerHandler(): logger.critical("Could not create the cache directory {0}".format(cachedir)) pass + self._collector = AlertCollector() self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG) self.__in_minutes = in_minutes - self.__collector = AlertCollector() self.__config_maps = {} + - def update_definitions(self, alert_commands, refresh_jobs=False): + def update_definitions(self, alert_commands, reschedule_jobs=False): ''' updates the persisted definitions and restarts the scheduler ''' with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f: json.dump(alert_commands, f, indent=2) - if refresh_jobs: - self.start() + if reschedule_jobs: + self.reschedule() + def __make_function(self, alert_def): return lambda: alert_def.collect() + def start(self): ''' loads definitions from file and starts the scheduler ''' @@ -90,26 +93,72 @@ class AlertSchedulerHandler(): alert_callables = self.__load_definitions() + # schedule each definition for _callable in alert_callables: - if self.__in_minutes: - self.__scheduler.add_interval_job(self.__make_function(_callable), - minutes=_callable.interval()) - else: - self.__scheduler.add_interval_job(self.__make_function(_callable), - seconds=_callable.interval()) + self.schedule_definition(_callable) logger.debug("Starting scheduler {0}; currently running: {1}".format( str(self.__scheduler), str(self.__scheduler.running))) + self.__scheduler.start() + def stop(self): if not self.__scheduler is None: self.__scheduler.shutdown(wait=False) self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG) + + + def reschedule(self): + ''' + Removes jobs that are scheduled where their UUID no longer is valid. + Schedules jobs where the definition UUID is not currently scheduled. + ''' + jobs_scheduled = 0 + jobs_removed = 0 + + definitions = self.__load_definitions() + scheduled_jobs = self.__scheduler.get_jobs() + + # for every scheduled job, see if its UUID is still valid + for scheduled_job in scheduled_jobs: + uuid_valid = False + + for definition in definitions: + definition_uuid = definition.definition_uuid() + if scheduled_job.name == definition_uuid: + uuid_valid = True + break + + # jobs without valid UUIDs should be unscheduled + if uuid_valid == False: + jobs_removed += 1 + logger.info("Unscheduling {0}".format(scheduled_job.name)) + self._collector.remove_by_uuid(scheduled_job.name) + self.__scheduler.unschedule_job(scheduled_job) + # for every definition, determine if there is a scheduled job + for definition in definitions: + definition_scheduled = False + for scheduled_job in scheduled_jobs: + definition_uuid = definition.definition_uuid() + if definition_uuid == scheduled_job.name: + definition_scheduled = True + break + + # if no jobs are found with the definitions UUID, schedule it + if definition_scheduled == False: + jobs_scheduled += 1 + self.schedule_definition(definition) + + logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format( + str(jobs_scheduled), str(jobs_removed))) + + def collector(self): ''' gets the collector for reporting to the server ''' - return self.__collector + return self._collector + def __load_definitions(self): ''' loads all alert commands from the file. all clusters are stored in one file ''' @@ -147,7 +196,7 @@ class AlertSchedulerHandler(): vals = self.__find_config_values(configmap, obj.get_lookup_keys()) self.__config_maps[clusterName].update(vals) - obj.set_helpers(self.__collector, self.__config_maps[clusterName]) + obj.set_helpers(self._collector, self.__config_maps[clusterName]) definitions.append(obj) @@ -208,7 +257,30 @@ class AlertSchedulerHandler(): configmap = command['configurations'] keylist = self.__config_maps[clusterName].keys() vals = self.__find_config_values(configmap, keylist) - self.__config_maps[clusterName].update(vals) + self.__config_maps[clusterName].update(vals) + + + def schedule_definition(self,definition): + ''' + Schedule a definition (callable). Scheduled jobs are given the UUID + as their name so that they can be identified later on. + ''' + job = None + + if self.__in_minutes: + job = self.__scheduler.add_interval_job(self.__make_function(definition), + minutes=definition.interval()) + else: + job = self.__scheduler.add_interval_job(self.__make_function(definition), + seconds=definition.interval()) + + # although the documentation states that Job(kwargs) takes a name + # key/value pair, it does not actually set the name; do it manually + if job is not None: + job.name = definition.definition_uuid() + + logger.info("Scheduling {0} with UUID {1}".format( + definition.definition_name(), definition.definition_uuid())) def main(): args = list(sys.argv) http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index b22938d..10dcff8 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -46,17 +46,34 @@ class BaseAlert(object): else: interval = self.alert_meta['interval'] return 1 if interval < 1 else interval - + + + def definition_name(self): + ''' + gets the unique name of the alert definition + ''' + return self.alert_meta['name'] + + + def definition_uuid(self): + ''' + gets the unique has of the alert definition + ''' + return self.alert_meta['uuid'] + + def set_helpers(self, collector, value_dict): ''' sets helper objects for alerts without having to use them in a constructor ''' self.collector = collector self.config_value_dict = value_dict - + + def set_cluster(self, cluster, host): ''' sets cluster information for the alert ''' self.cluster = cluster self.host_name = host - + + def collect(self): ''' method used for collection. defers to _collect() ''' @@ -83,12 +100,14 @@ class BaseAlert(object): data['service'] = self._find_value('serviceName') data['component'] = self._find_value('componentName') data['timestamp'] = long(time.time() * 1000) + data['uuid'] = self._find_value('uuid') if logger.isEnabledFor(logging.DEBUG): logger.debug("debug alert text: {0}".format(data['text'])) self.collector.put(self.cluster, data) - + + def _find_value(self, meta_key): ''' safe way to get a value when outputting result json. will not throw an exception ''' if self.alert_meta.has_key(meta_key): @@ -96,10 +115,12 @@ class BaseAlert(object): else: return None + def get_lookup_keys(self): ''' returns a list of lookup keys found for this alert ''' return self._lookup_keys - + + def _find_lookup_property(self, key): ''' check if the supplied key is parameterized @@ -112,7 +133,8 @@ class BaseAlert(object): return keys[0] return key - + + def _lookup_property_value(self, key): ''' in the case of specifying a configuration path, lookup that path's value @@ -124,7 +146,8 @@ class BaseAlert(object): return self.config_value_dict[key] else: return None - + + def _collect(self): ''' Low level function to collect alert data. The result is a tuple as: http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/main/python/ambari_agent/alerts/collector.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py index 7249449..9943211 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py @@ -33,8 +33,27 @@ class AlertCollector(): if not cluster in self.__buckets: self.__buckets[cluster] = {} - self.__buckets[cluster][alert['name']] = alert + self.__buckets[cluster][alert['name']] = alert + def remove(self, cluster, alert_name): + ''' + Removes the alert with the specified name if it exists in the dictionary + ''' + if not cluster in self.__buckets: + return + + del self.__buckets[cluster][alert_name] + + def remove_by_uuid(self, alert_uuid): + ''' + Removes the alert with the specified uuid if it exists in the dictionary + ''' + for cluster,alert_map in self.__buckets.iteritems(): + for alert_name in alert_map.keys(): + alert = alert_map[alert_name] + if alert['uuid'] == alert_uuid: + self.remove(cluster, alert_name) + def alerts(self): alerts = [] for clustermap in self.__buckets.values()[:]: http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/test/python/ambari_agent/TestAlerts.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py index 5dc45b5..662d8ee 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py @@ -56,6 +56,7 @@ class TestAlerts(TestCase): "label": "NameNode process", "interval": 6, "scope": "host", + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", "source": { "type": "PORT", "uri": "{{hdfs-site/my-key}}", @@ -86,6 +87,7 @@ class TestAlerts(TestCase): "label": "NameNode process", "interval": 6, "scope": "host", + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", "source": { "type": "PORT", "uri": "http://c6401.ambari.apache.org", @@ -115,6 +117,7 @@ class TestAlerts(TestCase): "label": "NameNode process", "interval": 6, "scope": "host", + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", "source": { "type": "SCRIPT", "path": "test_script.py", @@ -152,6 +155,7 @@ class TestAlerts(TestCase): "label": "NameNode process", "interval": 6, "scope": "host", + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", "source": { "type": "METRIC", "uri": "http://myurl:8633", @@ -197,3 +201,49 @@ class TestAlerts(TestCase): self.assertEquals('OK', collector.alerts()[0]['state']) self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text']) + def test_reschedule(self): + test_file_path = os.path.join('ambari_agent', 'dummy_files') + test_stack_path = os.path.join('ambari_agent', 'dummy_files') + + ash = AlertSchedulerHandler(test_file_path, test_stack_path) + ash.start() + ash.reschedule() + + + def test_alert_collector_purge(self): + json = { "name": "namenode_process", + "service": "HDFS", + "component": "NAMENODE", + "label": "NameNode process", + "interval": 6, + "scope": "host", + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", + "source": { + "type": "PORT", + "uri": "{{hdfs-site/my-key}}", + "default_port": 50070, + "reporting": { + "ok": { + "text": "TCP OK - {0:.4f} response time on port {1}" + }, + "critical": { + "text": "Could not load process info: {0}" + } + } + } + } + + collector = AlertCollector() + + pa = PortAlert(json, json['source']) + pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'}) + self.assertEquals(6, pa.interval()) + + res = pa.collect() + + self.assertIsNotNone(collector.alerts()[0]) + self.assertEquals('CRITICAL', collector.alerts()[0]['state']) + + collector.remove_by_uuid('c1f73191-4481-4435-8dae-fd380e4c0be1') + self.assertEquals(0,len(collector.alerts())) + http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json index ad52abc..30973c2 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json @@ -16,6 +16,7 @@ "label": "NameNode process", "interval": 6, "scope": "host", + "uuid": "3f82ae27-fa6a-465b-b77d-67963ac55d2f", "source": { "type": "PORT", "uri": "{{hdfs-site/dfs.namenode.http-address}}", http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-project/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-project/pom.xml b/ambari-project/pom.xml index 58e2e8f..edba1dc 100644 --- a/ambari-project/pom.xml +++ b/ambari-project/pom.xml @@ -55,8 +55,9 @@ - EclipseLink - http://download.eclipse.org/rt/eclipselink/maven.repo + oss.sonatype.org + OSS Sonatype Staging + https://oss.sonatype.org/content/groups/staging spring-milestones @@ -211,7 +212,7 @@ org.eclipse.persistence eclipselink - 2.4.0 + 2.4.2 org.postgresql http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index 5ac76f9..f37a177 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -191,7 +191,7 @@ org.eclipse.persistence eclipselink - 2.4.0 + 2.4.2 @@ -1451,14 +1451,12 @@ 1.5.2 - - + - EclipseLink - http://download.eclipse.org/rt/eclipselink/maven.repo + oss.sonatype.org + OSS Sonatype Staging + https://oss.sonatype.org/content/groups/staging http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java index fc51ddd..8c55aa7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java @@ -314,10 +314,6 @@ public class AlertGroupResourceProvider extends Collection targetIds = (Collection) requestMap.get(ALERT_GROUP_TARGETS); Collection definitionIds = (Collection) requestMap.get(ALERT_GROUP_DEFINITIONS); - if (!StringUtils.isBlank(name)) { - entity.setGroupName(name); - } - // if targets were supplied, replace existing Set targets = new HashSet(); if (null != targetIds && targetIds.size() > 0) { @@ -331,17 +327,26 @@ public class AlertGroupResourceProvider extends entity.setAlertTargets(targets); } - // if definitions were supplied, replace existing - Set definitions = new HashSet(); - if (null != definitionIds && definitionIds.size() > 0) { - List ids = new ArrayList(definitionIds.size()); - ids.addAll(definitionIds); - definitions.addAll(s_definitionDao.findByIds(ids)); + // only the targets should be updatable on default groups; everything + // else is valid only on regular groups + if (!entity.isDefault()) { + // set the name if supplied + if (!StringUtils.isBlank(name)) { + entity.setGroupName(name); + } - entity.setAlertDefinitions(definitions); - } else if (definitionIds.size() == 0) { - // empty array supplied, clear out existing definitions - entity.setAlertDefinitions(definitions); + // if definitions were supplied, replace existing + Set definitions = new HashSet(); + if (null != definitionIds && definitionIds.size() > 0) { + List ids = new ArrayList(definitionIds.size()); + ids.addAll(definitionIds); + definitions.addAll(s_definitionDao.findByIds(ids)); + + entity.setAlertDefinitions(definitions); + } else { + // empty array supplied, clear out existing definitions + entity.setAlertDefinitions(definitions); + } } s_dao.merge(entity); http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java new file mode 100644 index 0000000..c8fd36b --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import org.apache.ambari.server.state.alert.AlertDefinition; + +/** + * The {@link AlertDefinitionDeleteEvent} is used to represent that an + * {@link AlertDefinition} has been removed from the system. + */ +public class AlertDefinitionDeleteEvent extends ClusterEvent { + + /** + * The removed alert defintiion + */ + private final AlertDefinition m_definition; + + /** + * Constructor. + * + * @param clusterId + * the ID of the cluster that the definition is in. + * @param definition + * the alert definition being registered. + */ + public AlertDefinitionDeleteEvent( + long clusterId, AlertDefinition definition) { + super(AmbariEventType.ALERT_DEFINITION_REMOVAL, clusterId); + m_definition = definition; + } + + /** + * Get the registered alert definition. + * + * @return the alert definition (not {@code null}). + */ + public AlertDefinition getDefinition() { + return m_definition; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java index 6cf752e..3dae676 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java @@ -35,7 +35,12 @@ public abstract class AmbariEvent { /** * An alert definition is registered with the system. */ - ALERT_DEFINITION_REGISTRATION; + ALERT_DEFINITION_REGISTRATION, + + /** + * An alert definition is removed from the system. + */ + ALERT_DEFINITION_REMOVAL; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java index 43d4b35..9d2f4c6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java @@ -17,15 +17,22 @@ */ package org.apache.ambari.server.events.listeners; +import java.util.Set; + +import org.apache.ambari.server.events.AlertDefinitionDeleteEvent; import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; import org.apache.ambari.server.state.alert.AlertDefinition; +import org.apache.ambari.server.state.alert.AlertDefinitionHash; import org.apache.ambari.server.state.alert.SourceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; +import com.google.inject.Provider; import com.google.inject.Singleton; /** @@ -34,6 +41,10 @@ import com.google.inject.Singleton; */ @Singleton public class AlertLifecycleListener { + /** + * Logger. + */ + private static Logger LOG = LoggerFactory.getLogger(AlertLifecycleListener.class); /** * Used for quick lookups of aggregate alerts. @@ -42,6 +53,13 @@ public class AlertLifecycleListener { private AggregateDefinitionMapping m_aggregateMapping; /** + * Invalidates hosts so that they can receive updated alert definition + * commands. + */ + @Inject + private Provider m_alertDefinitionHash; + + /** * Constructor. * * @param publisher @@ -66,8 +84,42 @@ public class AlertLifecycleListener { public void onAmbariEvent(AlertDefinitionRegistrationEvent event) { AlertDefinition definition = event.getDefinition(); + LOG.debug("Registering alert definition {}", definition); + if (definition.getSource().getType() == SourceType.AGGREGATE) { - m_aggregateMapping.addAggregateType(event.getClusterId(), definition); + m_aggregateMapping.registerAggregate(event.getClusterId(), definition); + } + } + + /** + * Handles {@link AlertDefinitionDeleteEvent} by performing the following + * tasks: + *
    + *
  • Removal from with {@link AggregateDefinitionMapping}
  • + *
  • {@link AlertDefinitionHash} invalidation
  • + *
+ * + * @param event + * the event being handled. + */ + @Subscribe + @AllowConcurrentEvents + public void onAmbariEvent(AlertDefinitionDeleteEvent event) { + AlertDefinition definition = event.getDefinition(); + + LOG.debug("Removing alert definition {}", definition); + + if (null == definition) { + return; } + + m_aggregateMapping.removeAssociatedAggregate(event.getClusterId(), + definition.getName()); + + AlertDefinitionHash hashHelper = m_alertDefinitionHash.get(); + Set invalidatedHosts = hashHelper.invalidateHosts(definition); + + hashHelper.enqueueAgentCommands(definition.getClusterId(), + invalidatedHosts); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java index e87ba7d..78d8e66 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java @@ -101,6 +101,14 @@ public class AlertReceivedListener { AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId, alert.getName()); + if (null == definition) { + LOG.warn( + "Received an alert for {} which is a definition that does not exist anymore", + alert.getName()); + + return; + } + AlertHistoryEntity history = createHistory(clusterId, definition, alert); current = new AlertCurrentEntity(); http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java index 075ee04..8e8c808 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java @@ -25,6 +25,7 @@ import javax.persistence.EntityManager; import javax.persistence.TypedQuery; import org.apache.ambari.server.controller.RootServiceResponseFactory; +import org.apache.ambari.server.events.AlertDefinitionDeleteEvent; import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; @@ -364,6 +365,18 @@ public class AlertDefinitionDAO { if (null != alertDefinition) { entityManager.remove(alertDefinition); } + + // publish the alert definition removal + AlertDefinition coerced = alertDefinitionFactory.coerce(alertDefinition); + if (null != coerced) { + AlertDefinitionDeleteEvent event = new AlertDefinitionDeleteEvent( + alertDefinition.getClusterId(), coerced); + + eventPublisher.publish(event); + } else { + LOG.warn("Unable to broadcast alert removal event for {}", + alertDefinition.getDefinitionName()); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java index a28b448..f2161f3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java @@ -515,6 +515,7 @@ public class AlertsDAO { query.setParameter("clusterId", Long.valueOf(clusterId)); query.setParameter("definitionName", alertName); + query = setQueryRefreshHint(query); return daoUtils.selectOne(query); } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java index 8548fda..4ab2c80 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java @@ -17,7 +17,9 @@ */ package org.apache.ambari.server.orm.entities; +import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import javax.persistence.Basic; @@ -363,7 +365,7 @@ public class AlertDefinitionEntity { * @return the groups, or {@code null} if none. */ public Set getAlertGroups() { - return alertGroups; + return Collections.unmodifiableSet(alertGroups); } /** @@ -418,7 +420,7 @@ public class AlertDefinitionEntity { * @param alertGroup */ protected void removeAlertGroup(AlertGroupEntity alertGroup) { - if (null != alertGroups) { + if (null != alertGroups && alertGroups.contains(alertGroup)) { alertGroups.remove(alertGroup); } } @@ -429,12 +431,15 @@ public class AlertDefinitionEntity { */ @PreRemove public void preRemove() { - Set groups = getAlertGroups(); - if (null == groups || groups.size() == 0) { + if (null == alertGroups || alertGroups.size() == 0) { return; } - for (AlertGroupEntity group : groups) { + Iterator iterator = alertGroups.iterator(); + while (iterator.hasNext()) { + AlertGroupEntity group = iterator.next(); + iterator.remove(); + group.removeAlertDefinition(this); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java index 1cc9bcc..ac3586d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; @@ -73,14 +74,14 @@ public class AlertGroupEntity { /** * Bi-directional many-to-many association to {@link AlertDefinitionEntity} */ - @ManyToMany + @ManyToMany(cascade = CascadeType.MERGE) @JoinTable(name = "alert_grouping", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "definition_id", nullable = false) }) private Set alertDefinitions; /** * Unidirectional many-to-many association to {@link AlertTargetEntity} */ - @ManyToMany + @ManyToMany(cascade = CascadeType.MERGE) @JoinTable(name = "alert_group_target", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "target_id", nullable = false) }) private Set alertTargets; http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java index 04f20f9..c63c0d4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.state.alert; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.google.inject.Singleton; @@ -33,7 +34,8 @@ public final class AggregateDefinitionMapping { * In-memory mapping of cluster ID to definition name / aggregate definition. * This is used for fast lookups when receiving events. */ - private Map> m_aggregateMap = new HashMap>(); + private Map> m_aggregateMap = + new ConcurrentHashMap>(); /** * Constructor. @@ -69,10 +71,10 @@ public final class AggregateDefinitionMapping { * * @param clusterId * the ID of the cluster that the definition is bound to. - * @param name - * the unique name of the definition. + * @param definition + * the aggregate definition to register (not {@code null}). */ - public void addAggregateType(long clusterId, AlertDefinition definition) { + public void registerAggregate(long clusterId, AlertDefinition definition) { Long id = Long.valueOf(clusterId); if (!m_aggregateMap.containsKey(id)) { @@ -85,4 +87,25 @@ public final class AggregateDefinitionMapping { map.put(as.getAlertName(), definition); } + + /** + * Removes the associated aggregate for the specified aggregated definition. + * + * @param clusterId + * the ID of the cluster that the definition is bound to. + * @param name + * the unique name of the definition for which aggregates should be + * unassociated (not {@code null}). + */ + public void removeAssociatedAggregate(long clusterId, + String aggregatedDefinitonName) { + Long id = Long.valueOf(clusterId); + + if (!m_aggregateMap.containsKey(id)) { + return; + } + + Map map = m_aggregateMap.get(id); + map.remove(aggregatedDefinitonName); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java index 5058e91..961fb66 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java @@ -34,6 +34,7 @@ import java.util.HashSet; */ public class AlertDefinition { + private long clusterId; private String serviceName = null; private String componentName = null; @@ -46,6 +47,24 @@ public class AlertDefinition { private String uuid = null; /** + * Gets the cluster ID for this definition. + * + * @return + */ + public long getClusterId() { + return clusterId; + } + + /** + * Sets the cluster ID for this definition. + * + * @param clusterId + */ + public void setClusterId(long clusterId) { + this.clusterId = clusterId; + } + + /** * @return the service name */ public String getServiceName() { http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java index 4f6a9a3..9c72b6a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java @@ -125,6 +125,7 @@ public class AlertDefinitionFactory { } AlertDefinition definition = new AlertDefinition(); + definition.setClusterId(entity.getClusterId()); definition.setComponentName(entity.getComponentName()); definition.setEnabled(entity.getEnabled()); definition.setInterval(entity.getScheduleInterval()); http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java index 9ea039c..35d7742 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java @@ -262,85 +262,19 @@ public class AlertDefinitionHash { /** - * Gets the alert definition entities for the specified host. This will include the - * following types of alert definitions: - *
    - *
  • Service/Component alerts
  • - *
  • Service alerts where the host is a MASTER
  • - *
  • Host alerts that are not bound to a service
  • - *
+ * Invalidate the hashes of any host that would be affected by the specified + * definition. * - * @param clusterName - * the cluster name (not {@code null}). - * @param hostName - * the host name (not {@code null}). - * @return the alert definitions for the host, or an empty set (never + * @param definition + * the definition to use to find the hosts to invlidate (not + * {@code null}). + * @return the hosts that were invalidated, or an empty set (never * {@code null}). */ - private Set getAlertDefinitionEntities( - String clusterName, - String hostName) { - Set definitions = new HashSet(); - - try { - Cluster cluster = m_clusters.getCluster(clusterName); - if (null == cluster) { - LOG.warn("Unable to get alert definitions for the missing cluster {}", - clusterName); - - return Collections.emptySet(); - } - - long clusterId = cluster.getClusterId(); - List serviceComponents = cluster.getServiceComponentHosts(hostName); - if (null == serviceComponents || serviceComponents.size() == 0) { - LOG.warn( - "Unable to get alert definitions for {} since there are no service components defined", - hostName); - - return Collections.emptySet(); - } - - for (ServiceComponentHost serviceComponent : serviceComponents) { - String serviceName = serviceComponent.getServiceName(); - String componentName = serviceComponent.getServiceComponentName(); - - // add all alerts for this service/component pair - definitions.addAll(m_definitionDao.findByServiceComponent( - clusterId, serviceName, componentName)); - } - - // for every service, get the master components and see if the host - // is a master - Set services = new HashSet(); - for (Entry entry : cluster.getServices().entrySet()) { - Service service = entry.getValue(); - Map components = service.getServiceComponents(); - for (Entry component : components.entrySet()) { - if (component.getValue().isMasterComponent()) { - Map hosts = component.getValue().getServiceComponentHosts(); - - if( hosts.containsKey( hostName ) ){ - services.add(service.getName()); - } - } - } - } - - // add all service scoped alerts - if( services.size() > 0 ){ - definitions.addAll(m_definitionDao.findByServiceMaster(clusterId, - services)); - } - - // add any alerts not bound to a service (host level alerts) - definitions.addAll(m_definitionDao.findAgentScoped(clusterId)); - } catch (AmbariException ambariException) { - LOG.error("Unable to get alert definitions", ambariException); - return Collections.emptySet(); - } - - return definitions; + public Set invalidateHosts(AlertDefinitionEntity definition) { + return invalidateHosts(definition.getClusterId(), + definition.getDefinitionName(), definition.getServiceName(), + definition.getComponentName()); } /** @@ -353,8 +287,28 @@ public class AlertDefinitionHash { * @return the hosts that were invalidated, or an empty set (never * {@code null}). */ - public Set invalidateHosts(AlertDefinitionEntity definition) { - long clusterId = definition.getClusterId(); + public Set invalidateHosts(AlertDefinition definition) { + return invalidateHosts(definition.getClusterId(), definition.getName(), + definition.getServiceName(), definition.getComponentName()); + } + + /** + * Invalidate the hashes of any host that would be affected by the specified + * definition. + * + * @param clusterId + * the cluster ID + * @param definitionName + * the definition unique name. + * @param definitionServiceName + * the definition's service name. + * @param definitionComponentName + * the definition's component name. + * @return the hosts that were invalidated, or an empty set (never + * {@code null}). + */ + public Set invalidateHosts(long clusterId, String definitionName, + String definitionServiceName, String definitionComponentName) { Set invalidatedHosts = new HashSet(); Cluster cluster = null; @@ -379,8 +333,6 @@ public class AlertDefinitionHash { } // intercept host agent alerts; they affect all hosts - String definitionServiceName = definition.getServiceName(); - String definitionComponentName = definition.getComponentName(); if (Services.AMBARI.equals(definitionServiceName) && Components.AMBARI_AGENT.equals(definitionComponentName)) { @@ -413,7 +365,7 @@ public class AlertDefinitionHash { Service service = services.get(definitionServiceName); if (null == service) { LOG.warn("The alert definition {} has an unknown service of {}", - definition.getDefinitionName(), definitionServiceName); + definitionName, definitionServiceName); return invalidatedHosts; } @@ -439,6 +391,82 @@ public class AlertDefinitionHash { } /** + * Enqueue {@link AlertDefinitionCommand}s for every host specified so that + * they will receive a payload of alert definitions that they should be + * running. + *

+ * This method is typically called after + * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache + * invalidation of the alert definition hash. + * + * @param clusterName + * the name of the cluster (not {@code null}). + * @param hosts + * the hosts to push {@link AlertDefinitionCommand}s for. + */ + public void enqueueAgentCommands(long clusterId, Set hosts) { + String clusterName = null; + + try { + Cluster cluster = m_clusters.getClusterById(clusterId); + clusterName = cluster.getClusterName(); + } catch (AmbariException ae) { + LOG.error("Unable to lookup cluster for alert definition commands", ae); + } + + enqueueAgentCommands(clusterName, hosts); + } + + /** + * Enqueue {@link AlertDefinitionCommand}s for every host specified so that + * they will receive a payload of alert definitions that they should be + * running. + *

+ * This method is typically called after + * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache + * invalidation of the alert definition hash. + * + * @param clusterName + * the name of the cluster (not {@code null}). + * @param hosts + * the hosts to push {@link AlertDefinitionCommand}s for. + */ + public void enqueueAgentCommands(String clusterName, Set hosts) { + if (null == clusterName) { + LOG.warn("Unable to create alert definition agent commands because of a null cluster name"); + return; + } + + if (null == hosts || hosts.size() == 0) { + return; + } + + for (String hostName : hosts) { + List definitions = getAlertDefinitions(clusterName, + hostName); + + String hash = getHash(clusterName, hostName); + + AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, + hostName, hash, definitions); + + try { + Cluster cluster = m_clusters.getCluster(clusterName); + command.addConfigs(m_configHelper.get(), cluster); + } catch (AmbariException ae) { + LOG.warn("Unable to add configurations to alert definition command", ae); + } + + // unlike other commands, the alert definitions commands are really + // designed to be 1:1 per change; if multiple invalidations happened + // before the next heartbeat, there would be several commands that would + // force the agents to reschedule their alerts more than once + m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND); + m_actionQueue.enqueue(hostName, command); + } + } + + /** * Calculates a unique hash value representing all of the alert definitions * that should be scheduled to run on a given host. Alerts of type * {@link SourceType#AGGREGATE} are not included in the hash since they are @@ -492,51 +520,84 @@ public class AlertDefinitionHash { } /** - * Enqueue {@link AlertDefinitionCommand}s for every host specified so that - * they will receive a payload of alert definitions that they should be - * running. - *

- * This method is typically called after - * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache - * invalidation of the alert definition hash. + * Gets the alert definition entities for the specified host. This will include the + * following types of alert definitions: + *

    + *
  • Service/Component alerts
  • + *
  • Service alerts where the host is a MASTER
  • + *
  • Host alerts that are not bound to a service
  • + *
* * @param clusterName - * the name of the cluster (not {@code null}). - * @param hosts - * the hosts to push {@link AlertDefinitionCommand}s for. + * the cluster name (not {@code null}). + * @param hostName + * the host name (not {@code null}). + * @return the alert definitions for the host, or an empty set (never + * {@code null}). */ - public void enqueueAgentCommands(String clusterName, Set hosts) { - if (null == clusterName) { - LOG.warn("Unable to create alert definition agent commands because of a null cluster name"); - return; - } + private Set getAlertDefinitionEntities( + String clusterName, + String hostName) { + Set definitions = new HashSet(); - if (null == hosts || hosts.size() == 0) { - return; - } + try { + Cluster cluster = m_clusters.getCluster(clusterName); + if (null == cluster) { + LOG.warn("Unable to get alert definitions for the missing cluster {}", + clusterName); - for (String hostName : hosts) { - List definitions = getAlertDefinitions(clusterName, - hostName); + return Collections.emptySet(); + } - String hash = getHash(clusterName, hostName); + long clusterId = cluster.getClusterId(); + List serviceComponents = cluster.getServiceComponentHosts(hostName); + if (null == serviceComponents || serviceComponents.size() == 0) { + LOG.warn( + "Unable to get alert definitions for {} since there are no service components defined", + hostName); - AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, - hostName, hash, definitions); + return Collections.emptySet(); + } - try { - Cluster cluster = m_clusters.getCluster(clusterName); - command.addConfigs(m_configHelper.get(), cluster); - } catch (AmbariException ae) { - LOG.warn("Unable to add configurations to alert definition command", ae); + for (ServiceComponentHost serviceComponent : serviceComponents) { + String serviceName = serviceComponent.getServiceName(); + String componentName = serviceComponent.getServiceComponentName(); + + // add all alerts for this service/component pair + definitions.addAll(m_definitionDao.findByServiceComponent( + clusterId, serviceName, componentName)); } - // unlike other commands, the alert definitions commands are really - // designed to be 1:1 per change; if multiple invalidations happened - // before the next heartbeat, there would be several commands that would - // force the agents to reschedule their alerts more than once - m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND); - m_actionQueue.enqueue(hostName, command); + // for every service, get the master components and see if the host + // is a master + Set services = new HashSet(); + for (Entry entry : cluster.getServices().entrySet()) { + Service service = entry.getValue(); + Map components = service.getServiceComponents(); + for (Entry component : components.entrySet()) { + if (component.getValue().isMasterComponent()) { + Map hosts = component.getValue().getServiceComponentHosts(); + + if( hosts.containsKey( hostName ) ){ + services.add(service.getName()); + } + } + } + } + + // add all service scoped alerts + if( services.size() > 0 ){ + definitions.addAll(m_definitionDao.findByServiceMaster(clusterId, + services)); + } + + // add any alerts not bound to a service (host level alerts) + definitions.addAll(m_definitionDao.findAgentScoped(clusterId)); + } catch (AmbariException ambariException) { + LOG.error("Unable to get alert definitions", ambariException); + return Collections.emptySet(); } + + return definitions; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java index 5583617..771bf8a 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java @@ -383,6 +383,93 @@ public class AlertGroupResourceProviderTest { } /** + * Tests that updating a default group doesn't change read-only properties + * + * @throws Exception + */ + @Test + @SuppressWarnings("unchecked") + public void testUpdateDefaultGroup() throws Exception { + Capture entityCapture = new Capture(); + + // the definition IDs to associate with the group + List definitionIds = new ArrayList(); + definitionIds.add(ALERT_DEF_ID); + + // the target IDs to associate with the group + List targetIds = new ArrayList(); + targetIds.add(ALERT_TARGET_ID); + + // definition entities to return from DAO + List definitionEntities = new ArrayList(); + definitionEntities.addAll(getMockDefinitions()); + + // target entities to return from DAO + List newTargetEntities = new ArrayList(); + newTargetEntities.addAll(getMockTargets()); + + Set mockTargets2 = getMockTargets(); + AlertTargetEntity target2 = mockTargets2.iterator().next(); + target2.setTargetId(29L); + + newTargetEntities.add(target2); + + AlertGroupEntity group = new AlertGroupEntity(); + group.setDefault(true); + group.setGroupName(ALERT_GROUP_NAME); + group.setAlertDefinitions(getMockDefinitions()); + group.setAlertTargets(getMockTargets()); + + expect(m_dao.findGroupById(ALERT_GROUP_ID)).andReturn(group).times(1); + expect(m_dao.merge(capture(entityCapture))).andReturn(group).once(); + + // expect target entity lookup for association + List newTargets = Arrays.asList(28L, 29L); + expect(m_dao.findTargetsById(EasyMock.eq(newTargets))).andReturn( + newTargetEntities).once(); + + replay(m_dao, m_definitionDao); + + AlertGroupResourceProvider provider = createProvider(m_amc); + + // create new properties, and include the ID since we're not going through + // a service layer which would add it for us automatically + Map requestProps = new HashMap(); + requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_ID, + ALERT_GROUP_ID.toString()); + + // try to change the name (it should not work) + String newName = ALERT_GROUP_NAME + " Foo"; + requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_NAME, newName); + + // try to change the definitions (it should not work) + requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_DEFINITIONS, + new ArrayList()); + + // try to change the targets (it should work) + requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_TARGETS, + newTargets); + + Predicate predicate = new PredicateBuilder().property( + AlertGroupResourceProvider.ALERT_GROUP_CLUSTER_NAME).equals( + ALERT_GROUP_CLUSTER_NAME).and().property( + AlertGroupResourceProvider.ALERT_GROUP_ID).equals( + ALERT_GROUP_ID.toString()).toPredicate(); + + Request request = PropertyHelper.getUpdateRequest(requestProps, null); + provider.updateResources(request, predicate); + + assertTrue(entityCapture.hasCaptured()); + + AlertGroupEntity entity = entityCapture.getValue(); + assertEquals(ALERT_GROUP_NAME, entity.getGroupName()); + assertEquals(2, entity.getAlertTargets().size()); + assertEquals(1, entity.getAlertDefinitions().size()); + + verify(m_dao, m_definitionDao); + } + + /** * @throws Exception */ @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java index 4d182cc..a1cd063 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java @@ -118,7 +118,7 @@ public class AlertDefinitionDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.SERVICE); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); dao.create(definition); } @@ -133,7 +133,7 @@ public class AlertDefinitionDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.HOST); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); dao.create(definition); } @@ -148,7 +148,7 @@ public class AlertDefinitionDAOTest { definition.setHash(UUID.randomUUID().toString()); definition.setScheduleInterval(60); definition.setScope(Scope.HOST); - definition.setSource("Source " + i); + definition.setSource("{\"type\" : \"SCRIPT\"}"); definition.setSourceType(SourceType.SCRIPT); dao.create(definition); } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java index fd18134..1ee1224 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java @@ -72,6 +72,7 @@ public class AlertDefinitionEqualityTest extends TestCase { */ private AlertDefinition getAlertDefinition(SourceType sourceType) { AlertDefinition definition = new AlertDefinition(); + definition.setClusterId(1); definition.setComponentName("component"); definition.setEnabled(true); definition.setInterval(1); http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java index 482b5ef..7309851 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java @@ -17,13 +17,20 @@ */ package org.apache.ambari.server.state.alerts; +import java.lang.reflect.Field; + import junit.framework.Assert; import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.events.AlertDefinitionDeleteEvent; import org.apache.ambari.server.events.AmbariEvent; +import org.apache.ambari.server.events.listeners.AlertLifecycleListener; import org.apache.ambari.server.events.listeners.AlertServiceStateListener; +import org.apache.ambari.server.events.listeners.AlertStateChangedListener; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.AlertDispatchDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; @@ -33,10 +40,15 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.alert.AggregateDefinitionMapping; +import org.apache.ambari.server.state.alert.AggregateSource; +import org.apache.ambari.server.state.alert.AlertDefinition; +import org.apache.ambari.server.state.alert.Scope; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.persist.PersistService; @@ -55,6 +67,9 @@ public class AlertEventPublisherTest { private Injector injector; private ServiceFactory serviceFactory; private AmbariMetaInfo metaInfo; + private OrmTestHelper ormHelper; + private AggregateDefinitionMapping aggregateMapping; + private AmbariEventPublisher eventPublisher; /** * @@ -64,13 +79,25 @@ public class AlertEventPublisherTest { injector = Guice.createInjector(new InMemoryDefaultTestModule()); injector.getInstance(GuiceJpaInitializer.class); + eventPublisher = injector.getInstance(AmbariEventPublisher.class); + EventBus synchronizedBus = new EventBus(); + // force singleton init via Guice so the listener registers with the bus - injector.getInstance(AlertServiceStateListener.class); + synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class)); + synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); + synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class)); + + // !!! need a synchronous op for testing + Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus"); + field.setAccessible(true); + field.set(eventPublisher, synchronizedBus); dispatchDao = injector.getInstance(AlertDispatchDAO.class); definitionDao = injector.getInstance(AlertDefinitionDAO.class); clusters = injector.getInstance(Clusters.class); serviceFactory = injector.getInstance(ServiceFactory.class); + ormHelper = injector.getInstance(OrmTestHelper.class); + aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class); metaInfo = injector.getInstance(AmbariMetaInfo.class); metaInfo.init(); @@ -107,7 +134,7 @@ public class AlertEventPublisherTest { /** * Tests that all {@link AlertDefinitionEntity} instances are created for the * installed service. - * + * * @throws Exception */ @Test @@ -118,6 +145,43 @@ public class AlertEventPublisherTest { } /** + * Tests that {@link AlertDefinitionDeleteEvent} instances are fired when a + * definition is removed. + * + * @throws Exception + */ + @Test + public void testAlertDefinitionRemoval() throws Exception { + Assert.assertEquals(0, definitionDao.findAll().size()); + AlertDefinitionEntity definition = ormHelper.createAlertDefinition(1L); + Assert.assertEquals(1, definitionDao.findAll().size()); + + AggregateSource source = new AggregateSource(); + source.setAlertName(definition.getDefinitionName()); + + AlertDefinition aggregate = new AlertDefinition(); + aggregate.setClusterId(1L); + aggregate.setComponentName("DATANODE"); + aggregate.setEnabled(true); + aggregate.setInterval(1); + aggregate.setLabel("DataNode Aggregate"); + aggregate.setName("datanode_aggregate"); + aggregate.setScope(Scope.ANY); + aggregate.setServiceName("HDFS"); + aggregate.setSource(source); + aggregate.setUuid("uuid"); + + aggregateMapping.registerAggregate(1L, aggregate); + Assert.assertNotNull(aggregateMapping.getAggregateDefinition(1L, + source.getAlertName())); + + definitionDao.remove(definition); + + Assert.assertNull(aggregateMapping.getAggregateDefinition(1L, + source.getAlertName())); + } + + /** * Calls {@link Service#persist()} to mock a service install. */ private void installHdfsService() throws Exception { http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java index 312f297..18b4123 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.state.alerts; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -39,6 +40,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -72,6 +74,14 @@ public class AlertStateChangedEventTest { dispatchDao = injector.getInstance(AlertDispatchDAO.class); eventPublisher = injector.getInstance(AlertEventPublisher.class); + + EventBus synchronizedBus = new EventBus(); + synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class)); + + // !!! need a synchronous op for testing + Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus"); + field.setAccessible(true); + field.set(eventPublisher, synchronizedBus); } /** @@ -99,8 +109,6 @@ public class AlertStateChangedEventTest { // async publishing eventPublisher.publish(event); - Thread.sleep(2000); - EasyMock.verify(dispatchDao, history, event); } http://git-wip-us.apache.org/repos/asf/ambari/blob/7db82c92/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java index 85a6e9b..502c3b9 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java @@ -373,7 +373,7 @@ public class AlertDataManagerTest { AggregateDefinitionMapping aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class); AlertDefinition aggregateDefinition = factory.coerce(aggDef); - aggregateMapping.addAggregateType(clusterId.longValue(), + aggregateMapping.registerAggregate(clusterId.longValue(), aggregateDefinition ); AggregateSource as = (AggregateSource) aggregateDefinition.getSource();