ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: AMBARI-13954 Enable auto-start with alerting for AMS (dsen)
Date Fri, 20 Nov 2015 09:37:02 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 b1dd0f224 -> dfc0a0a7e


AMBARI-13954 Enable auto-start with alerting for AMS (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfc0a0a7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfc0a0a7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfc0a0a7

Branch: refs/heads/branch-2.1
Commit: dfc0a0a7e01b4920ca8a43f288450afc5df33420
Parents: b1dd0f2
Author: Dmytro Sen <dsen@apache.org>
Authored: Fri Nov 20 11:33:00 2015 +0200
Committer: Dmytro Sen <dsen@apache.org>
Committed: Fri Nov 20 11:35:22 2015 +0200

----------------------------------------------------------------------
 .../ambari_agent/AlertSchedulerHandler.py       |   8 +-
 .../src/main/python/ambari_agent/Controller.py  |  11 +-
 .../src/main/python/ambari_agent/FileCache.py   |   3 +
 .../main/python/ambari_agent/RecoveryManager.py |  76 +++++++++++--
 .../ambari_agent/alerts/recovery_alert.py       | 103 ++++++++++++++++++
 .../test/python/ambari_agent/TestActionQueue.py |   6 +-
 .../ambari_agent/TestAlertSchedulerHandler.py   |  22 ++--
 .../src/test/python/ambari_agent/TestAlerts.py  | 109 ++++++++++++++++++-
 .../test/python/ambari_agent/TestHeartbeat.py   |   7 +-
 .../python/ambari_agent/TestRecoveryManager.py  |  32 +++---
 ambari-server/conf/unix/ambari.properties       |   4 +
 .../state/alert/AlertDefinitionFactory.java     |   4 +
 .../server/state/alert/RecoverySource.java      |  32 ++++++
 .../ambari/server/state/alert/SourceType.java   |   5 +
 .../AMBARI_METRICS/0.1.0/alerts.json            |  24 ++++
 15 files changed, 395 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index d3aab87..3e89437 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -33,6 +33,7 @@ from alerts.metric_alert import MetricAlert
 from alerts.port_alert import PortAlert
 from alerts.script_alert import ScriptAlert
 from alerts.web_alert import WebAlert
+from alerts.recovery_alert import RecoveryAlert
 from ambari_agent.ExitHelper import ExitHelper
 logger = logging.getLogger(__name__)
 
@@ -42,9 +43,11 @@ class AlertSchedulerHandler():
   TYPE_METRIC = 'METRIC'
   TYPE_SCRIPT = 'SCRIPT'
   TYPE_WEB = 'WEB'
+  TYPE_RECOVERY = 'RECOVERY'
 
   def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
-      alert_grace_period, cluster_configuration, config, in_minutes=True):
+      alert_grace_period, cluster_configuration, config, recovery_manager,
+      in_minutes=True):
 
     self.cachedir = cachedir
     self.stacks_dir = stacks_dir
@@ -70,6 +73,7 @@ class AlertSchedulerHandler():
     self.__scheduler = Scheduler(self.APS_CONFIG)
     self.__in_minutes = in_minutes
     self.config = config
+    self.recovery_manger = recovery_manager
 
     # register python exit handler
     ExitHelper().register(self.exit_handler)
@@ -282,6 +286,8 @@ class AlertSchedulerHandler():
         alert = ScriptAlert(json_definition, source, self.config)
       elif source_type == AlertSchedulerHandler.TYPE_WEB:
         alert = WebAlert(json_definition, source, self.config)
+      elif source_type == AlertSchedulerHandler.TYPE_RECOVERY:
+        alert = RecoveryAlert(json_definition, source, self.recovery_manger)
 
       if alert is not None:
         alert.set_cluster(clusterName, hostName)

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 4ba5c45..a066d08 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -84,7 +84,6 @@ class Controller(threading.Thread):
     self.heartbeat_stop_callback = heartbeat_stop_callback
     # List of callbacks that are called at agent registration
     self.registration_listeners = []
-    self.recovery_manager = RecoveryManager()
 
     # pull config directory out of config
     cache_dir = config.get('agent', 'cache_dir')
@@ -94,8 +93,11 @@ class Controller(threading.Thread):
     stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
     common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
     host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
-    alerts_cache_dir = os.path.join(cache_dir, 'alerts')
-    cluster_config_cache_dir = os.path.join(cache_dir, 'cluster_configuration')
+    alerts_cache_dir = os.path.join(cache_dir, FileCache.ALERTS_CACHE_DIRECTORY)
+    cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY)
+    recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
+
+    self.recovery_manager = RecoveryManager(recovery_cache_dir)
 
     self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
 
@@ -105,7 +107,8 @@ class Controller(threading.Thread):
 
     self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, 
       stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir,
-      self.alert_grace_period, self.cluster_configuration, config)
+      self.alert_grace_period, self.cluster_configuration, config,
+      self.recovery_manager)
 
     self.alert_scheduler_handler.start()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/main/python/ambari_agent/FileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index 1a683a1..0bcd1bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -39,6 +39,9 @@ class FileCache():
   downloads relevant files from the server.
   """
 
+  CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration"
+  ALERTS_CACHE_DIRECTORY="alerts"
+  RECOVERY_CACHE_DIRECTORY="recovery"
   STACKS_CACHE_DIRECTORY="stacks"
   COMMON_SERVICES_DIRECTORY="common-services"
   CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index cab81f5..cd8e839 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -15,9 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import json
 import logging
 import copy
+import os
 import time
 import threading
 import pprint
@@ -56,6 +57,8 @@ class RecoveryManager:
   COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
   COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
 
+  FILENAME = "recovery.json"
+
   default_action_counter = {
     "lastAttempt": 0,
     "count": 0,
@@ -72,8 +75,7 @@ class RecoveryManager:
     "stale_config": False
   }
 
-
-  def __init__(self, recovery_enabled=False, auto_start_only=False):
+  def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False):
     self.recovery_enabled = recovery_enabled
     self.auto_start_only = auto_start_only
     self.max_count = 6
@@ -87,14 +89,24 @@ class RecoveryManager:
     self.allowed_current_states = [self.INIT, self.INSTALLED]
     self.enabled_components = []
     self.disabled_components = []
-    self.actions = {}
     self.statuses = {}
     self.__status_lock = threading.RLock()
     self.__command_lock = threading.RLock()
     self.__active_command_lock = threading.RLock()
+    self.__cache_lock = threading.RLock()
     self.active_command_count = 0
     self.paused = False
 
+    if not os.path.exists(cache_dir):
+      try:
+        os.makedirs(cache_dir)
+      except:
+        logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir))
+
+    self.__actions_json_file = os.path.join(cache_dir, self.FILENAME)
+
+    self.actions = self._load_actions()
+
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", "")
 
     pass
@@ -366,6 +378,7 @@ class RecoveryManager:
     """
     action_counter = self.actions[action_name]
     now = self._now_()
+    executed = False
     seconds_since_last_attempt = now - action_counter["lastAttempt"]
     if action_counter["lifetimeCount"] < self.max_lifetime_count:
       if action_counter["count"] < self.max_count:
@@ -377,7 +390,7 @@ class RecoveryManager:
           action_counter["warnedLastAttempt"] = False
           if action_counter["count"] == 1:
             action_counter["lastReset"] = now
-          return True
+          executed = True
         else:
           if action_counter["warnedLastAttempt"] == False:
             action_counter["warnedLastAttempt"] = True
@@ -398,7 +411,7 @@ class RecoveryManager:
             action_counter["lastAttempt"] = now
           action_counter["lastReset"] = now
           action_counter["warnedLastReset"] = False
-          return True
+          executed = True
         else:
           if action_counter["warnedLastReset"] == False:
             action_counter["warnedLastReset"] = True
@@ -417,7 +430,54 @@ class RecoveryManager:
       else:
         logger.debug("%s occurrences in agent life time reached the limit for %s",
                      action_counter["lifetimeCount"], action_name)
-    return False
+    self._dump_actions()
+    return executed
+    pass
+
+
+  def _dump_actions(self):
+    """
+    Dump recovery actions to FS
+    """
+    self.__cache_lock.acquire()
+    try:
+      with open(self.__actions_json_file, 'w') as f:
+        json.dump(self.actions, f, indent=2)
+    except Exception, exception:
+      logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file))
+      return False
+    finally:
+      self.__cache_lock.release()
+
+    return True
+    pass
+
+
+  def _load_actions(self):
+    """
+    Loads recovery actions from FS
+    """
+    self.__cache_lock.acquire()
+
+    try:
+      if os.path.isfile(self.__actions_json_file):
+        with open(self.__actions_json_file, 'r') as fp:
+          return json.load(fp)
+    except Exception, exception:
+      logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file))
+    finally:
+      self.__cache_lock.release()
+
+    return {}
+    pass
+
+
+  def get_actions_copy(self):
+    """
+    Loads recovery actions from FS
+    :return:  recovery actions copy
+    """
+    return self._load_actions()
     pass
 
 
@@ -775,7 +835,7 @@ class RecoveryManager:
 
 
 def main(argv=None):
-  cmd_mgr = RecoveryManager()
+  cmd_mgr = RecoveryManager('/tmp')
   pass
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
new file mode 100644
index 0000000..66b4b24
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+import datetime
+from alerts.base_alert import BaseAlert
+logger = logging.getLogger()
+
+# default recoveries counts
+DEFAULT_WARNING_RECOVERIES_COUNT = 1
+DEFAULT_CRITICAL_RECOVERIES_COUNT = 5
+
+UNKNOWN_COMPONENT = 'UNKNOWN_COMPONENT'
+class RecoveryAlert(BaseAlert):
+
+  def __init__(self, alert_meta, alert_source_meta, recovery_manager):
+    super(RecoveryAlert, self).__init__(alert_meta, alert_source_meta)
+
+    self.recovery_manager = recovery_manager
+    self.warning_count = DEFAULT_WARNING_RECOVERIES_COUNT
+    self.critical_count = DEFAULT_CRITICAL_RECOVERIES_COUNT
+
+    if 'reporting' in alert_source_meta:
+      reporting = alert_source_meta['reporting']
+      reporting_state_warning = self.RESULT_WARNING.lower()
+      reporting_state_critical = self.RESULT_CRITICAL.lower()
+
+      if reporting_state_warning in reporting and \
+          'count' in reporting[reporting_state_warning]:
+        self.warning_count = reporting[reporting_state_warning]['count']
+
+      if reporting_state_critical in reporting and \
+          'count' in reporting[reporting_state_critical]:
+        self.critical_count = reporting[reporting_state_critical]['count']
+    if self.critical_count <= self.warning_count:
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.debug("[Alert][{0}] The CRITICAL value of {1} must be greater than the WARNING
value of {2}".format(
+          self.get_name(), self.critical_count, self.warning_count))
+
+  def _collect(self):
+
+    component = UNKNOWN_COMPONENT
+    if 'componentName' in self.alert_meta:
+      component = self.alert_meta['componentName']
+
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("[Alert][{0}] Checking recovery operations for {1}".format(
+        self.get_name(), component))
+
+    recovery_action_info = {}
+    recovery_actions = self.recovery_manager.get_actions_copy()
+    if component in recovery_actions:
+      recovery_action_info = recovery_actions[component]
+
+    recovered_times = 0
+    if 'count' in recovery_action_info:
+      recovered_times = recovery_action_info['count']
+    lastResetText = ""
+    if 'lastReset' in recovery_action_info:
+      lastResetText = " since " + str(datetime.datetime.fromtimestamp(recovery_action_info['lastReset']))
+    warned_threshold_reached = False
+    if 'warnedThresholdReached' in recovery_action_info:
+      warned_threshold_reached = recovery_action_info['warnedThresholdReached']
+
+    if recovered_times >= self.critical_count or warned_threshold_reached:
+      result = self.RESULT_CRITICAL
+    elif recovered_times >= self.warning_count:
+      result = self.RESULT_WARNING
+    elif recovered_times < self.warning_count and \
+        recovered_times < self.critical_count:
+      result = self.RESULT_OK
+    else:
+      result = self.RESULT_UNKNOWN
+
+    return (result, [lastResetText, recovered_times, component])
+
+  def _get_reporting_text(self, state):
+    '''
+    Gets the default reporting text to use when the alert definition does not
+    contain any.
+    :param state: the state of the alert in uppercase (such as OK, WARNING, etc)
+    :return:  the parametrized text
+    '''
+    if state == self.RESULT_OK:
+      return 'No recovery operations executed for {2}{0}.'
+    return '{1} recovery operations executed for {2}{0}.'

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 8a9d7d4..802760b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -341,7 +341,7 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", "")
 
     actionQueue = ActionQueue(config, dummy_controller)
@@ -665,7 +665,7 @@ class TestActionQueue(TestCase):
 
     build_mock.return_value = {'dummy report': '' }
 
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
 
     requestComponentStatus_mock.reset_mock()
     requestComponentStatus_mock.return_value = {'exitcode': 0 }
@@ -725,7 +725,7 @@ class TestActionQueue(TestCase):
                                 get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     config = MagicMock()
     gpeo_mock.return_value = 0
     config.get_parallel_exec_option = gpeo_mock

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
index a08e4bc..9fd426f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
@@ -42,7 +42,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertEquals(len(definitions), 1)
 
   def test_json_to_callable_metric(self):
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     json_definition = {
       'source': {
         'type': 'METRIC'
@@ -63,7 +63,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host',
copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is not None)
@@ -79,7 +79,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host',
copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is not None)
@@ -94,7 +94,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host',
copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is None)
@@ -102,7 +102,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_noneScheduler(self):
     execution_commands = []
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     scheduler._AlertSchedulerHandler__scheduler = None
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
@@ -114,7 +114,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_noneCommands(self):
     execution_commands = None
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
 
@@ -125,7 +125,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_emptyCommands(self):
     execution_commands = []
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
 
@@ -144,7 +144,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     ]
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     alert_mock = MagicMock()
     alert_mock.collect = Mock()
     alert_mock.set_helpers = Mock()
@@ -159,7 +159,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertTrue(alert_mock.collect.called)
 
   def test_load_definitions(self):
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     scheduler._AlertSchedulerHandler__config_maps = {
       'cluster': {}
     }
@@ -170,7 +170,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertTrue(isinstance(alert_def, PortAlert))
 
   def test_load_definitions_noFile(self):
-    scheduler = AlertSchedulerHandler('wrong_path', 'wrong_path', 'wrong_path', 'wrong_path',
5, None, None)
+    scheduler = AlertSchedulerHandler('wrong_path', 'wrong_path', 'wrong_path', 'wrong_path',
5, None, None, None)
     scheduler._AlertSchedulerHandler__config_maps = {
       'cluster': {}
     }
@@ -190,7 +190,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     ]
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None,
None, None)
     alert_mock = MagicMock()
     alert_mock.interval = Mock(return_value=5)
     alert_mock.collect = Mock()

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 1e6da64..85d6746 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -22,14 +22,17 @@ import os
 import socket
 import sys
 import urllib2
+import tempfile
 
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent.RecoveryManager import RecoveryManager
 from ambari_agent.alerts.collector import AlertCollector
 from ambari_agent.alerts.base_alert import BaseAlert
 from ambari_agent.alerts.metric_alert import MetricAlert
 from ambari_agent.alerts.port_alert import PortAlert
 from ambari_agent.alerts.script_alert import ScriptAlert
 from ambari_agent.alerts.web_alert import WebAlert
+from ambari_agent.alerts.recovery_alert import RecoveryAlert
 from ambari_agent.apscheduler.scheduler import Scheduler
 from ambari_agent.ClusterConfiguration import ClusterConfiguration
 from ambari_commons.urllib_handlers import RefreshHeaderProcessor
@@ -60,7 +63,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -109,6 +112,70 @@ class TestAlerts(TestCase):
     self.assertEquals(0, len(collector.alerts()))
     self.assertEquals('CRITICAL', alerts[0]['state'])
 
+  @patch.object(RecoveryManager, "get_actions_copy")
+  def test_recovery_alert(self, rm_get_actions_mock):
+    definition_json = self._get_recovery_alert_definition()
+    rm_get_actions_mock.return_value = {
+        "METRICS_COLLECTOR": {
+          "count": 0,
+          "lastAttempt": 1447860184,
+          "warnedLastReset": False,
+          "lastReset": 1447860184,
+          "warnedThresholdReached": False,
+          "lifetimeCount": 1,
+          "warnedLastAttempt": False
+        }
+      }
+
+    collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, {})
+
+    rm = RecoveryManager(tempfile.mktemp(), True)
+    alert = RecoveryAlert(definition_json, definition_json['source'], rm)
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    self.assertEquals(1, alert.interval())
+
+    #  OK - "count": 0
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('OK', alerts[0]['state'])
+
+    #  WARN - "count": 1
+    rm_get_actions_mock.return_value = {
+      "METRICS_COLLECTOR": {
+        "count": 1,
+        "lastAttempt": 1447860184,
+        "warnedLastReset": False,
+        "lastReset": 1447860184,
+        "warnedThresholdReached": False,
+        "lifetimeCount": 1,
+        "warnedLastAttempt": False
+      }
+    }
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('WARNING', alerts[0]['state'])
+
+    #  CRIT - "count": 5
+    rm_get_actions_mock.return_value = {
+      "METRICS_COLLECTOR": {
+        "count": 5,
+        "lastAttempt": 1447860184,
+        "warnedLastReset": False,
+        "lastReset": 1447860184,
+        "warnedThresholdReached": False,
+        "lifetimeCount": 1,
+        "warnedLastAttempt": False
+      }
+    }
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('CRITICAL', alerts[0]['state'])
 
   @patch.object(socket.socket,"connect")
   def test_port_alert_complex_uri(self, socket_connect_mock):
@@ -475,7 +542,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -522,7 +589,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -558,7 +625,7 @@ class TestAlerts(TestCase):
     cluster_configuration = self.__get_cluster_configuration()
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -633,6 +700,13 @@ class TestAlerts(TestCase):
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}')
 
+    rm = RecoveryManager(tempfile.mktemp())
+    definition_json['source']['type'] = 'RECOVERY'
+    alert = RecoveryAlert(definition_json, definition_json['source'], rm)
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'No recovery operations
executed for {2}{0}.')
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{1} recovery operations
executed for {2}{0}.')
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{1} recovery operations
executed for {2}{0}.')
+
 
   def test_configuration_updates(self):
     definition_json = self._get_script_alert_definition()
@@ -1212,6 +1286,33 @@ class TestAlerts(TestCase):
     }
 
 
+  def _get_recovery_alert_definition(self):
+    return {
+      "componentName": "METRICS_COLLECTOR",
+      "name": "ams_metrics_collector_autostart",
+      "label": "Metrics Collector Recovery",
+      "description": "This alert is triggered if the Metrics Collector has been auto-started
for number of times equal to threshold.",
+      "interval": 1,
+      "scope": "HOST",
+      "enabled": True,
+      "source": {
+        "type": "RECOVERY",
+        "reporting": {
+          "ok": {
+            "text": "Metrics Collector hasn't been auto-started since {0}."
+          },
+          "warning": {
+            "text": "Metrics Collector has been auto-started {1} times since {0}.",
+            "count": 1
+          },
+          "critical": {
+            "text": "Metrics Collector has been auto-started {1} times since {0}.",
+            "count": 5
+          }
+        }
+      }
+    }
+
   def _get_metric_alert_definition(self):
     return {
       "name": "DataNode CPU Check",

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index b2825bd..26f6286 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -20,9 +20,7 @@ limitations under the License.
 
 from unittest import TestCase
 import unittest
-import socket
-import os
-import time
+import tempfile
 from mock.mock import patch, MagicMock, call
 import StringIO
 import sys
@@ -116,7 +114,7 @@ class TestHeartbeat(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     actionQueue = ActionQueue(config, dummy_controller)
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
@@ -211,6 +209,7 @@ class TestHeartbeat(TestCase):
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
+
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(config, dummy_controller)
     statusCommand = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index e6115e3..1335dab 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -20,7 +20,7 @@ limitations under the License.
 
 from unittest import TestCase
 import copy
-
+import tempfile
 from ambari_agent.RecoveryManager import RecoveryManager
 from mock.mock import patch, MagicMock, call
 
@@ -124,7 +124,7 @@ class TestRecoveryManager(TestCase):
 
   @patch.object(RecoveryManager, "update_desired_status")
   def test_process_commands(self, mock_uds):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.process_status_commands(None)
     self.assertFalse(mock_uds.called)
 
@@ -154,7 +154,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_defaults(self):
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     self.assertFalse(rm.enabled())
     self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
     self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
@@ -170,7 +170,7 @@ class TestRecoveryManager(TestCase):
       [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
        1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
 
-    rm = RecoveryManager(True, False)
+    rm = RecoveryManager(tempfile.mktemp(), True, False)
     self.assertTrue(rm.enabled())
 
     rm.update_config(0, 60, 5, 12, True, False, "", "")
@@ -243,7 +243,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_recovery_required(self):
-    rm = RecoveryManager(True, False)
+    rm = RecoveryManager(tempfile.mktemp(), True, False)
 
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -273,7 +273,7 @@ class TestRecoveryManager(TestCase):
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
 
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -291,13 +291,13 @@ class TestRecoveryManager(TestCase):
 
   def test_recovery_required2(self):
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "", "")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "NODEMANAGER", "")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -307,7 +307,7 @@ class TestRecoveryManager(TestCase):
     rm.update_desired_status("DATANODE", "STARTED")
     self.assertFalse(rm.requires_recovery("DATANODE"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "", "NODEMANAGER")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -339,7 +339,7 @@ class TestRecoveryManager(TestCase):
 
   @patch('time.time', MagicMock(side_effects=[1]))
   def test_store_from_status_and_use(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
 
     command1 = copy.deepcopy(self.command)
 
@@ -391,7 +391,7 @@ class TestRecoveryManager(TestCase):
        4100, 4101, 4102, 4103,
        4200, 4201, 4202,
        4300, 4301, 4302]
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.update_config(15, 5, 1, 16, True, False, "", "")
 
     command1 = copy.deepcopy(self.command)
@@ -469,7 +469,7 @@ class TestRecoveryManager(TestCase):
 
   @patch.object(RecoveryManager, "update_config")
   def test_update_rm_config(self, mock_uc):
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     rm.update_configuration_from_registration(None)
     mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True, "", "")])
 
@@ -518,7 +518,7 @@ class TestRecoveryManager(TestCase):
     time_mock.side_effect = \
       [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
 
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     rec_st = rm.get_recovery_status()
     self.assertEquals(rec_st, {"summary": "DISABLED"})
 
@@ -565,7 +565,7 @@ class TestRecoveryManager(TestCase):
     time_mock.side_effect = \
       [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
 
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.update_config(5, 5, 1, 11, True, False, "", "")
 
     command1 = copy.deepcopy(self.command)
@@ -594,7 +594,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_command_count(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     self.assertFalse(rm.has_active_command())
     rm.start_execution_command()
     self.assertTrue(rm.has_active_command())
@@ -606,7 +606,7 @@ class TestRecoveryManager(TestCase):
     self.assertFalse(rm.has_active_command())
 
   def test_configured_for_recovery(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertTrue(rm.configured_for_recovery("B"))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-server/conf/unix/ambari.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/ambari.properties b/ambari-server/conf/unix/ambari.properties
index 7f0a464..ed45ffe 100644
--- a/ambari-server/conf/unix/ambari.properties
+++ b/ambari-server/conf/unix/ambari.properties
@@ -107,3 +107,7 @@ http.x-frame-options=DENY
 views.http.strict-transport-security=max-age=31536000
 views.http.x-xss-protection=1; mode=block
 views.http.x-frame-options=SAMEORIGIN
+
+# Enable Metrics Collector auto-restart
+recovery.type=AUTO_START
+recovery.enabled_components=METRICS_COLLECTOR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
index 4bc25f8..3b4f5fc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
@@ -328,6 +328,10 @@ public class AlertDefinitionFactory {
           clazz = WebSource.class;
           break;
         }
+        case RECOVERY: {
+          clazz = RecoverySource.class;
+          break;
+        }
         case SERVER:{
           clazz = ServerSource.class;
           break;

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-server/src/main/java/org/apache/ambari/server/state/alert/RecoverySource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/RecoverySource.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/RecoverySource.java
new file mode 100644
index 0000000..afbb3eb
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/RecoverySource.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alert;
+
+/**
+ * Alert when the source type is defined as {@link org.apache.ambari.server.state.alert.SourceType#RECOVERY}
+ * <p/>
+ * Equality checking for instances of this class should be executed on every
+ * member to ensure that reconciling stack differences is correct.
+ */
+public class RecoverySource extends Source {
+
+  public RecoverySource() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java
index 6c1aa9a..357baf9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java
@@ -52,6 +52,11 @@ public enum SourceType {
   WEB,
 
   /**
+   * Source is a component state recovery results
+   */
+  RECOVERY,
+
+  /**
    * A server-side alert.
    */
   SERVER;

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfc0a0a7/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
index 319427d..7611e01 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
@@ -29,6 +29,30 @@
     ],
     "METRICS_COLLECTOR": [
       {
+        "name": "ams_metrics_collector_autostart",
+        "label": "Metrics Collector Auto-Start",
+        "description": "This alert is triggered if the Metrics Collector has been auto-started
for number of times equal to threshold.",
+        "interval": 1,
+        "scope": "ANY",
+        "enabled": true,
+        "source": {
+          "type": "RECOVERY",
+          "reporting": {
+            "ok": {
+              "text": "Metrics Collector hasn't been auto-started{0}."
+            },
+            "warning": {
+              "text": "Metrics Collector has been auto-started {1} times{0}.",
+              "count": 1
+            },
+            "critical": {
+              "text": "Metrics Collector has been auto-started {1} times{0}.",
+              "count": 5
+            }
+          }
+        }
+      },
+      {
         "name": "ams_metrics_collector_process",
         "label": "Metrics Collector Process",
         "description": "This alert is triggered if the Metrics Collector cannot be confirmed
to be up and listening on the configured port for number of seconds equal to threshold.",


Mime
View raw message