ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stoa...@apache.org
Subject ambari git commit: AMBARI-17248. Reduce the idle time before first command from next stage is executed on a host. (stoader)
Date Wed, 22 Jun 2016 16:04:14 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 32448be8f -> f1c414564


AMBARI-17248. Reduce the idle time before first command from next stage is executed on a host.
(stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f1c41456
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f1c41456
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f1c41456

Branch: refs/heads/branch-2.4
Commit: f1c414564f09f83d19891b1a33a1551fd8faf7cf
Parents: 32448be
Author: Toader, Sebastian <stoader@hortonworks.com>
Authored: Wed Jun 22 15:14:00 2016 +0200
Committer: Toader, Sebastian <stoader@hortonworks.com>
Committed: Wed Jun 22 18:03:54 2016 +0200

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  5 +-
 ambari-agent/conf/unix/upgrade_agent_configs.py |  3 +-
 ambari-agent/conf/windows/ambari-agent.ini      |  4 +-
 .../main/python/ambari_agent/AmbariConfig.py    |  3 +
 .../src/main/python/ambari_agent/Controller.py  | 65 +++++++++++++++++---
 .../src/main/python/ambari_agent/Heartbeat.py   | 19 +++---
 .../src/main/python/ambari_agent/NetUtil.py     | 31 +++++++++-
 .../test/python/ambari_agent/TestHeartbeat.py   |  4 +-
 .../src/test/python/ambari_agent/TestNetUtil.py | 21 +++++++
 .../ambari_agent/examples/ControllerTester.py   |  2 +-
 .../ambari/server/agent/HeartBeatHandler.java   |  3 +
 .../ambari/server/agent/HeartBeatResponse.java  |  7 +++
 .../org/apache/ambari/server/state/Cluster.java |  6 ++
 .../apache/ambari/server/state/Clusters.java    |  8 +++
 .../server/state/cluster/ClusterImpl.java       | 11 ++++
 .../server/state/cluster/ClustersImpl.java      | 22 +++++++
 .../server/state/cluster/ClusterImplTest.java   | 29 +++++++++
 17 files changed, 218 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 8f2ab1b..914e09a 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -47,13 +47,16 @@ ssl_verify_cert=0
 pidLookupPath=/var/run/
 
 [heartbeat]
-state_interval=6
+state_interval_seconds=60
 dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
   /etc/sqoop,/etc/ganglia,
   /var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie,
   /var/log/hadoop,/var/log/zookeeper,/var/log/hbase,/var/run/templeton,/var/log/hive
 ; 0 - unlimited
 log_lines_count=300
+idle_interval_min=1
+idle_interval_max=10
+
 
 [logging]
 syslog_enabled=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/conf/unix/upgrade_agent_configs.py
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/upgrade_agent_configs.py b/ambari-agent/conf/unix/upgrade_agent_configs.py
index 583b5aa..108b510 100644
--- a/ambari-agent/conf/unix/upgrade_agent_configs.py
+++ b/ambari-agent/conf/unix/upgrade_agent_configs.py
@@ -21,7 +21,8 @@ import os
 import ConfigParser
 
 PROPERTIES_TO_REWRITE = [
-  ('heartbeat', 'dirs')
+  ('heartbeat', 'dirs'),
+  ('heartbeat', 'state_interval')
 ]
 SECTIONS_TO_REMOVE = [
   'stack', 'puppet', 'command', 'python'

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/conf/windows/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index df88be6..b7a0a4d 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -42,7 +42,7 @@ passphrase_env_var_name=AMBARI_PASSPHRASE
 pidLookupPath=\\var\\run\\ambari-agent
 
 [heartbeat]
-state_interval=6
+state_interval_seconds=60
 dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
   /etc/sqoop,/etc/ganglia,/etc/nagios,
   /var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie,
@@ -52,3 +52,5 @@ rpms=nagios,ganglia,
   hadoop,hadoop-lzo,hbase,oozie,sqoop,pig,zookeeper,hive,libconfuse,ambari-log4j
 ; 0 - unlimited
 log_lines_count=300
+idle_interval_min=1
+idle_interval_max=10

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 89a881a..ae938dc 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -62,6 +62,9 @@ passphrase_env_var_name=AMBARI_PASSPHRASE
 state_interval = 6
 dirs={ps}etc{ps}hadoop,{ps}etc{ps}hadoop{ps}conf,{ps}var{ps}run{ps}hadoop,{ps}var{ps}log{ps}hadoop
 log_lines_count=300
+iddle_interval_min=1
+iddle_interval_max=10
+
 
 [logging]
 log_command_executes = 0

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index e981a76..a05011a 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -29,6 +29,7 @@ import urllib2
 import pprint
 from random import randint
 import subprocess
+import functools
 
 import hostname
 import security
@@ -103,6 +104,17 @@ class Controller(threading.Thread):
     cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY)
     recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
 
+    self.heartbeat_idle_interval_min = int(self.config.get('heartbeat', 'idle_interval_min'))
if self.config.get('heartbeat', 'idle_interval_min') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC
+    if self.heartbeat_idle_interval_min < 1:
+      self.heartbeat_idle_interval_min = 1
+
+    self.heartbeat_idle_interval_max = int(self.config.get('heartbeat', 'idle_interval_max'))
if self.config.get('heartbeat', 'idle_interval_max') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+
+    if self.heartbeat_idle_interval_min > self.heartbeat_idle_interval_max:
+      raise Exception("Heartbeat minimum interval={0} seconds can not be greater than the
maximum interval={1} seconds !".format(self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max))
+
+    self.get_heartbeat_interval = functools.partial(self.netutil.get_agent_heartbeat_idle_interval_sec,
self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max)
+
     self.recovery_manager = RecoveryManager(recovery_cache_dir)
 
     self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
@@ -245,18 +257,38 @@ class Controller(threading.Thread):
     self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
     retry = False
     certVerifFailed = False
-    hb_interval = self.config.get('heartbeat', 'state_interval')
+    state_interval = self.config.get('heartbeat', 'state_interval_seconds', '60')
+
+    # last time when state was successfully sent to server
+    last_state_timestamp = 0.0
+
+    # in order to be able to check from logs that heartbeats processing
+    # still running we log a message. However to avoid generating too
+    # much log when the heartbeat runs at a higher rate (e.g. 1 second intervals)
+    # we log the message at the same interval as 'state interval'
+    heartbeat_running_msg_timestamp = 0.0
 
     while not self.DEBUG_STOP_HEARTBEATING:
+      heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+
       try:
+        crt_time = time.time()
+        if crt_time - heartbeat_running_msg_timestamp > int(state_interval):
+          logger.info("Heartbeat with server is running...")
+          heartbeat_running_msg_timestamp = crt_time
+
+        send_state = False
         if not retry:
+          if crt_time - last_state_timestamp > int(state_interval):
+            send_state = True
+
           data = json.dumps(
-              self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
+              self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
 
-        if logger.isEnabledFor(logging.DEBUG):
-          logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
+
+        logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
 
         response = self.sendRequest(self.heartbeatUrl, data)
         exitStatus = 0
@@ -268,14 +300,25 @@ class Controller(threading.Thread):
 
         serverId = int(response['responseId'])
 
-        logger.info('Heartbeat response received (id = %s)', serverId)
+
+        logger.debug('Heartbeat response received (id = %s)', serverId)
+
+        cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else
-1
+
+        # TODO: this needs to be revised if hosts can be shared across multiple clusters
+        heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
+          if cluster_size > 0 \
+          else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+
+
+        logger.debug("Heartbeat interval is %s seconds", heartbeat_interval)
 
         if 'hasMappedComponents' in response.keys():
           self.hasMappedComponents = response['hasMappedComponents'] is not False
 
         if 'hasPendingTasks' in response.keys():
-          self.recovery_manager.set_paused(response['hasPendingTasks'])
-
+          has_pending_tasks = bool(response['hasPendingTasks'])
+          self.recovery_manager.set_paused(has_pending_tasks)
 
         if 'registrationCommand' in response.keys():
           # check if the registration command is None. If none skip
@@ -299,6 +342,8 @@ class Controller(threading.Thread):
           self.restartAgent()
         else:
           self.responseId = serverId
+          if send_state:
+            last_state_timestamp = time.time()
 
         # if the response contains configurations, update the in-memory and
         # disk-based configuration cache (execution and alert commands have this)
@@ -381,8 +426,8 @@ class Controller(threading.Thread):
         time.sleep(delay)
 
       # Sleep for some time
-      timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
-                - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+      timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+
       if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
         # Stop loop when stop event received
         logger.info("Stop event received")
@@ -427,7 +472,7 @@ class Controller(threading.Thread):
         for callback in self.registration_listeners:
           callback()
 
-        time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
+        time.sleep(self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC)
         self.heartbeatWithServer()
       else:
         logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index 91098e0..82ea9b6 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -40,7 +40,7 @@ class Heartbeat:
     self.reports = []
     self.collector = alert_collector
 
-  def build(self, id='-1', state_interval=-1, componentsMapped=False):
+  def build(self, id='-1', add_state=False, componentsMapped=False):
     global clusterId, clusterDefinitionRevision, firstContact
     timestamp = int(time.time()*1000)
     queueResult = self.actionQueue.result()
@@ -75,16 +75,19 @@ class Heartbeat:
     if int(id) == 0:
       componentsMapped = False
 
-    logger.info("Building Heartbeat: {responseId = %s, timestamp = %s, "
+
+
+    logger.debug("Building Heartbeat: {responseId = %s, timestamp = %s, "
                 "commandsInProgress = %s, componentsMapped = %s,"
                 "recoveryTimestamp = %s}",
         str(id), str(timestamp), repr(commandsInProgress), repr(componentsMapped), str(recovery_timestamp))
 
-    if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("Heartbeat: %s", pformat(heartbeat))
+
+    logger.debug("Heartbeat: %s", pformat(heartbeat))
 
     hostInfo = HostInfo(self.config)
-    if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 0:
+    if add_state:
+      logger.info("Adding host info/state to heartbeat message.")
       nodeInfo = { }
       # for now, just do the same work as registration
       # this must be the last step before returning heartbeat
@@ -93,9 +96,9 @@ class Heartbeat:
       mounts = Hardware.osdisks(self.config)
       heartbeat['mounts'] = mounts
 
-      if logger.isEnabledFor(logging.DEBUG):
-        logger.debug("agentEnv: %s", str(nodeInfo))
-        logger.debug("mounts: %s", str(mounts))
+
+      logger.debug("agentEnv: %s", str(nodeInfo))
+      logger.debug("mounts: %s", str(mounts))
 
     if self.collector is not None:
       heartbeat['alerts'] = self.collector.alerts()

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index 80bf3ae..2e9381b 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -31,7 +31,8 @@ logger = logging.getLogger(__name__)
 class NetUtil:
 
   CONNECT_SERVER_RETRY_INTERVAL_SEC = 10
-  HEARTBEAT_IDDLE_INTERVAL_SEC = 10
+  HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC = 1
+  HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 10
   MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
 
   # Url within server to request during status check. This url
@@ -120,4 +121,32 @@ class NetUtil:
         if logger is not None:
           logger.info("Stop event received")
         self.DEBUG_STOP_RETRIES_FLAG = True
+
     return retries, connected, self.DEBUG_STOP_RETRIES_FLAG
+
+  def get_agent_heartbeat_idle_interval_sec(self, heartbeat_idle_interval_min, heartbeat_idle_interval_max,
cluster_size):
+    """
+    Returns the interval in seconds to be used between agent heartbeats when
+    there are pending stages which requires higher heartbeat rate to reduce the latency
+    between the completion of the last command of the current stage and the starting of first
+    command of next stage.
+
+    The heartbeat intervals for elevated heartbeats is calculated as a function of the size
of the cluster.
+
+    Using a higher hearbeat rate in case of large clusters will cause agents to flood
+    the server with heartbeat messages thus the calculated heartbeat interval is restricted
to
+    [heartbeat_idle_interval_min, heartbeat_idle_interval_max] range.
+
+    :param cluster_size: the number of nodes the cluster consists of
+    :return: the heartbeat interval in seconds
+    """
+
+    heartbeat_idle_interval = cluster_size // heartbeat_idle_interval_max
+
+    if heartbeat_idle_interval < heartbeat_idle_interval_min:
+      return heartbeat_idle_interval_min
+
+    if heartbeat_idle_interval > heartbeat_idle_interval_max:
+      return heartbeat_idle_interval_max
+
+    return heartbeat_idle_interval

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index f113083..99ccb4c 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -100,11 +100,11 @@ class TestHeartbeat(TestCase):
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(config, dummy_controller)
     heartbeat = Heartbeat(actionQueue)
-    hb = heartbeat.build(id = 10, state_interval=1, componentsMapped=True)
+    hb = heartbeat.build(id = 10, add_state=True, componentsMapped=True)
     self.assertEqual(register_mock.call_args_list[0][0][1], True)
     register_mock.reset_mock()
 
-    hb = heartbeat.build(id = 0, state_interval=1, componentsMapped=True)
+    hb = heartbeat.build(id = 0, add_state=True, componentsMapped=True)
     self.assertEqual(register_mock.call_args_list[0][0][1], False)
 
   @patch.object(ActionQueue, "result")

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/test/python/ambari_agent/TestNetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestNetUtil.py b/ambari-agent/src/test/python/ambari_agent/TestNetUtil.py
index d72e319..251a048 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestNetUtil.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestNetUtil.py
@@ -79,3 +79,24 @@ class TestNetUtil(unittest.TestCase):
     checkURL.side_effect = None
     checkURL.return_value = False, "test"
     self.assertEqual((5, False, False), netutil.try_to_connect("url", 5))
+
+  def test_get_agent_heartbeat_idle_interval_sec(self):
+    netutil = NetUtil.NetUtil(MagicMock())
+
+    heartbeat_interval = netutil.get_agent_heartbeat_idle_interval_sec(1, 10, 32)
+
+    self.assertEqual(heartbeat_interval, 3)
+
+  def test_get_agent_heartbeat_idle_interval_sec_max(self):
+    netutil = NetUtil.NetUtil(MagicMock())
+
+    heartbeat_interval = netutil.get_agent_heartbeat_idle_interval_sec(1, 10, 1500)
+
+    self.assertEqual(heartbeat_interval, netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC)
+
+  def test_get_agent_heartbeat_idle_interval_sec_min(self):
+    netutil = NetUtil.NetUtil(MagicMock())
+
+    heartbeat_interval = netutil.get_agent_heartbeat_idle_interval_sec(1, 10, 5)
+
+    self.assertEqual(heartbeat_interval, 1)

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-agent/src/test/python/ambari_agent/examples/ControllerTester.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/examples/ControllerTester.py b/ambari-agent/src/test/python/ambari_agent/examples/ControllerTester.py
index 8103872..bc46ac6 100644
--- a/ambari-agent/src/test/python/ambari_agent/examples/ControllerTester.py
+++ b/ambari-agent/src/test/python/ambari_agent/examples/ControllerTester.py
@@ -141,7 +141,7 @@ def run_simulation():
 
   controller = Controller.Controller(config)
   controller.sendRequest = sendRequest_method
-  controller.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC = 0.1
+  controller.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC = 0.1
   controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
   controller.range = 1
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 35a37e3..94a8fe0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -517,12 +517,15 @@ public class HeartBeatHandler {
    * Annotate the response with some housekeeping details.
    * hasMappedComponents - indicates if any components are mapped to the host
    * hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent
yet)
+   * clusterSize - indicates the number of hosts that form the cluster
    * @param hostname
    * @param response
    * @throws org.apache.ambari.server.AmbariException
    */
   private void annotateResponse(String hostname, HeartBeatResponse response) throws AmbariException
{
     for (Cluster cl : clusterFsm.getClustersForHost(hostname)) {
+      response.setClusterSize(cl.getClusterSize());
+
       List<ServiceComponentHost> scHosts = cl.getServiceComponentHosts(hostname);
       if (scHosts != null && scHosts.size() > 0) {
         response.setHasMappedComponents(true);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
index 1ab7ae9..02970a1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
@@ -72,6 +72,9 @@ public class HeartBeatResponse {
   @SerializedName("recoveryConfig")
   private RecoveryConfig recoveryConfig;
 
+  @SerializedName("clusterSize")
+  private int clusterSize = -1;
+
   public long getResponseId() {
     return responseId;
   }
@@ -210,6 +213,10 @@ public class HeartBeatResponse {
     alertExecutionCommands.add(command);
   }
 
+  public void setClusterSize(int clusterSize) {
+    this.clusterSize = clusterSize;
+  }
+
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder("HeartBeatResponse{");

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index ac0ddd2..0519123 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -735,4 +735,10 @@ public interface Cluster {
    * @return
    */
   String getClusterProperty(String propertyName, String defaultValue);
+
+  /**
+   * Returns the number of hosts that form the cluster.
+   * @return number of hosts that form the cluster
+   */
+  int  getClusterSize();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java
index bd9de13..2d859b3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java
@@ -270,4 +270,12 @@ public interface Clusters {
    * @return the map of session attributes for the cluster; never null
    */
   Map<String, Object> getSessionAttributes(String name);
+
+  /**
+   * Returns the number of hosts that form the cluster identified by the given name.
+   * @param clusterName the name that identifies the cluster
+   * @return  number of hosts that form the cluster
+   */
+  int getClusterSize(String clusterName);
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 3d2388e..fdb997b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -3604,6 +3604,17 @@ public class ClusterImpl implements Cluster {
   }
 
   /**
+  * Returns the number of hosts that form the cluster.
+  *
+  *  @return number of hosts that form the cluster
+  */
+  @Override
+  public int getClusterSize() {
+    return clusters.getClusterSize(clusterName);
+  }
+
+
+  /**
    * {@inheritDoc}
    */
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index c26e1e9..6318545 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -991,6 +991,28 @@ public class ClustersImpl implements Clusters {
     return cluster == null ? Collections.<String, Object>emptyMap() : cluster.getSessionAttributes();
   }
 
+  /**
+   * Returns the number of hosts that form the cluster identified by the given name.
+   *
+   * @param clusterName the name that identifies the cluster
+   * @return number of hosts that form the cluster
+   */
+  @Override
+  public int getClusterSize(String clusterName) {
+    checkLoaded();
+    r.lock();
+
+    int hostCount = 0;
+
+    if (clusterHostMap.containsKey(clusterName) && clusterHostMap.get(clusterName)
!= null) {
+      hostCount = clusterHostMap.get(clusterName).size();
+    }
+
+    r.unlock();
+
+    return hostCount;
+
+  }
 
   // ----- helper methods ---------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f1c41456/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
index 627ade9..aca32e4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
@@ -306,4 +306,33 @@ public class ClusterImplTest {
 
 
   }
+
+  @Test
+  public void testGetClusterSize() throws Exception {
+    // Given
+    String clusterName = "TEST_CLUSTER_SIZE";
+    String hostName1 = "host1", hostName2 = "host2";
+    clusters.addCluster(clusterName, new StackId("HDP-2.1.1"));
+
+    Cluster cluster = clusters.getCluster(clusterName);
+    clusters.addHost(hostName1);
+    clusters.addHost(hostName2);
+
+    Host host1 = clusters.getHost(hostName1);
+    host1.setHostAttributes(ImmutableMap.of("os_family", "centos", "os_release_version",
"6.0"));
+    host1.persist();
+
+    Host host2 = clusters.getHost(hostName2);
+    host2.setHostAttributes(ImmutableMap.of("os_family", "centos", "os_release_version",
"6.0"));
+    host2.persist();
+
+    clusters.mapHostsToCluster(Sets.newHashSet(hostName1, hostName2), clusterName);
+
+    // When
+    int clusterSize = cluster.getClusterSize();
+
+    // Then
+    assertEquals(2, clusterSize);
+
+  }
 }
\ No newline at end of file


Mime
View raw message