ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [1/2] ambari git commit: AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)
Date Wed, 30 Mar 2016 18:35:27 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk c479fdd96 -> dfa4454e7


http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
new file mode 100644
index 0000000..53b9e7e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+public class TimelineMetricHAController {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricHAController.class);
+
+  static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
+  static final String INSTANCE_NAME_DELIMITER = "_";
+
+  final String zkConnectUrl;
+  final String instanceHostname;
+  final InstanceConfig instanceConfig;
+  final AggregationTaskRunner aggregationTaskRunner;
+
+  // Cache list of known live instances
+  final List<String> liveInstanceNames = new ArrayList<>();
+
+  // Helix Admin
+  HelixAdmin admin;
+  // Helix Manager
+  HelixManager manager;
+
+  private volatile boolean isInitialized = false;
+
+  public TimelineMetricHAController(TimelineMetricConfiguration configuration) {
+    String instancePort;
+    try {
+      instanceHostname = configuration.getInstanceHostnameFromEnv();
+      instancePort = configuration.getInstancePort();
+
+    } catch (Exception e) {
+      LOG.error("Error reading configs from classpath, will resort to defaults.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    try {
+      String zkClientPort = configuration.getZKClientPort();
+      String zkQuorum = configuration.getZKQuorum();
+
+      if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+          + zkClientPort +", quorum = " + zkQuorum);
+      }
+
+      zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
+
+    } catch (Exception e) {
+      LOG.error("Unable to load hbase-site from classpath.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
+    instanceConfig.setHostName(instanceHostname);
+    instanceConfig.setPort(instancePort);
+    instanceConfig.setInstanceEnabled(true);
+    aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
+  }
+
+  /**
+   * Initialize the instance with zookeeper via Helix
+   */
+  public void initializeHAController() throws Exception {
+    admin = new ZKHelixAdmin(zkConnectUrl);
+    // create cluster
+    LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
+    admin.addCluster(CLUSTER_NAME, false);
+
+    // Adding host to the cluster
+    List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+    if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
+      LOG.info("Adding participant instance " + instanceConfig);
+      admin.addInstance(CLUSTER_NAME, instanceConfig);
+    }
+
+    // Add a state model
+    if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
+      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build());
+    }
+
+    // Add resources with 1 cluster-wide replica
+    // Since our aggregators are unbalanced in terms of work distribution we
+    // only need to distribute writes to METRIC_AGGREGATE and
+    // METRIC_RECORD_MINUTE
+    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
+    if (!resources.contains(METRIC_AGGREGATORS)) {
+      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
+      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString());
+    }
+    // this will set up the ideal state, it calculates the preference list for
+    // each partition similar to consistent hashing
+    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Start participant
+    startAggregators();
+
+    // Start controller
+    startController();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        aggregationTaskRunner.stop();
+        manager.disconnect();
+      }
+    });
+
+    isInitialized = true;
+  }
+
+  /**
+   * Return true if HA controller is enabled.
+   */
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  private void startAggregators() {
+    try {
+      aggregationTaskRunner.initialize();
+
+    } catch (Exception e) {
+      LOG.error("Unable to start aggregators.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+  }
+
+  private void startController() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(
+      CLUSTER_NAME,
+      instanceHostname,
+      InstanceType.CONTROLLER,
+      zkConnectUrl
+    );
+
+    manager.connect();
+    HelixController controller = new HelixController();
+    manager.addLiveInstanceChangeListener(controller);
+  }
+
+  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+    StringBuilder sb = new StringBuilder();
+    String[] quorumParts = zkQuorum.split(",");
+    String prefix = "";
+    for (String part : quorumParts) {
+      sb.append(prefix);
+      sb.append(part.trim());
+      if (!part.contains(":")) {
+        sb.append(":");
+        sb.append(zkClientPort);
+      }
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  public AggregationTaskRunner getAggregationTaskRunner() {
+    return aggregationTaskRunner;
+  }
+
+  public List<String> getLiveInstanceHostNames() {
+    List<String> liveInstanceHostNames = new ArrayList<>();
+
+    for (String instance : liveInstanceNames) {
+      liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+    }
+
+    return liveInstanceHostNames;
+  }
+
+  public class HelixController extends GenericHelixController {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    Joiner joiner = Joiner.on(", ").skipNulls();
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext
changeContext) {
+      super.onLiveInstanceChange(liveInstances, changeContext);
+
+      liveInstanceNames.clear();
+      for (LiveInstance instance : liveInstances) {
+        liveInstanceNames.add(instance.getInstanceName());
+      }
+
+      LOG.info("Detected change in liveliness of Collector instances. " +
+        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      // Print HA state - after some delay
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          printClusterState();
+        }
+      }, 30, TimeUnit.SECONDS);
+
+
+    }
+  }
+
+  public void printClusterState() {
+    StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
+
+    ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+    if (resourceExternalView != null) {
+      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+    }
+    sb.append("\n##################################################");
+    LOG.info(sb.toString());
+  }
+
+  private void getPrintableResourceState(ExternalView resourceExternalView,
+                                         String resourceName,
+                                         StringBuilder sb) {
+    TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
+    sb.append("\nCLUSTER: ");
+    sb.append(CLUSTER_NAME);
+    sb.append("\nRESOURCE: ");
+    sb.append(resourceName);
+    for (String partitionName : sortedSet) {
+      sb.append("\nPARTITION: ");
+      sb.append(partitionName).append("\t");
+      Map<String, String> states = resourceExternalView.getStateMap(partitionName);
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        sb.append("\t");
+        sb.append(stateEntry.getKey());
+        sb.append("\t");
+        sb.append(stateEntry.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
index 0ea136b..c5761f7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -32,7 +32,7 @@ import java.sql.SQLException;
 public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
 
   static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
-  private static final String ZOOKEEPER_CLIENT_PORT ="hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
   private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
   private static final String ZNODE_PARENT = "zookeeper.znode.parent";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 873671a..d2526a0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -423,6 +423,18 @@ public class TimelineWebServices {
     }
   }
 
+  @GET
+  @Path("/metrics/livenodes")
+  @Produces({ MediaType.APPLICATION_JSON })
+  public List<String> getLiveCollectorNodes(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res
+  ) {
+    init(res);
+
+    return timelineMetricStore.getLiveInstances();
+  }
+
   /**
    * Store the given entities into the timeline store, and return the errors
    * that happen during storing.

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 89f3fbe..f0030a1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -128,7 +128,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest
{
   public void testGetMetricRecordsMinutes() throws IOException, SQLException {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration(),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -165,7 +165,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest
{
   public void testGetMetricRecordsHours() throws IOException, SQLException {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(),
null);
 
     MetricHostAggregate expectedAggregate =
         createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -217,7 +217,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest
{
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-        hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -256,8 +256,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest
{
   public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond
-        (hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -297,7 +297,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest
{
   public void testGetClusterMetricRecordsHours() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration(),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 8f8067b..61d6e71 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -94,4 +94,9 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException,
IOException {
     return Collections.emptyMap();
   }
+
+  @Override
+  public List<String> getLiveInstances() {
+    return Collections.emptyList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 827f399..ea947d0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicLong;
+
 import static junit.framework.Assert.assertEquals;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
@@ -44,7 +45,7 @@ public class AbstractTimelineAggregatorTest {
 
   @Before
   public void setUp() throws Exception {
-    sleepIntervalMillis = 5*60*1000l; //5 minutes
+    sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes
     checkpointCutOffMultiplier = 2;
 
     Configuration metricsConf = new Configuration();
@@ -56,7 +57,7 @@ public class AbstractTimelineAggregatorTest {
     checkPoint = new AtomicLong(-1);
     actualRuns = 0;
 
-    agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) {
+    agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND,
null, metricsConf) {
       @Override
       public boolean doWork(long startTime, long endTime) {
         startTimeInDoWork.set(startTime);

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index f201224..590f82a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -77,7 +77,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -130,7 +130,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -206,7 +206,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -270,7 +270,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testAggregateDailyClusterMetrics() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false),
null);
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -315,7 +315,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
 
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -382,7 +382,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false),
null);
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -426,7 +426,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -490,7 +490,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        conf, new TimelineMetricMetadataManager(hdb, new Configuration()));
+        conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -542,7 +542,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()),
null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -619,7 +619,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest
{
   public void testAggregationUsingGroupByQuery() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true),
null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9c7c8fa..9873643 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -22,24 +22,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -95,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -156,7 +145,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -219,7 +208,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
-        getConfigurationForTest(false));
+        getConfigurationForTest(false), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -281,7 +270,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(true));
+        getConfigurationForTest(true), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index d2d478c..f55dda1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.easymock.EasyMock;
 import org.junit.Test;
 
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+
 public class TimelineMetricClusterAggregatorSecondTest {
 
   @Test
@@ -42,9 +43,9 @@ public class TimelineMetricClusterAggregatorSecondTest {
     TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
-      "TimelineClusterAggregatorSecond", metricMetadataManagerMock, null, configuration,
null,
-      aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval
-    );
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
+      configuration, null, aggregatorInterval, 2, "false", "", "",
+      aggregatorInterval, sliceInterval, null);
 
     secondAggregator.timeSliceIntervalMillis = sliceInterval;
     long roundedEndTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(aggregatorInterval);

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
new file mode 100644
index 0000000..04e8909
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest {
+  TimelineMetricConfiguration configuration;
+
+  @Before
+  public void setup() throws Exception {
+    configuration = createNiceMock(TimelineMetricConfiguration.class);
+
+    expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
+    expect(configuration.getInstancePort()).andReturn("12000");
+    // jdbc:phoenix:localhost:52887:/hbase;test=true
+    String zkUrl = getUrl();
+    String port = zkUrl.split(":")[3];
+    String quorum = zkUrl.split(":")[2];
+
+    expect(configuration.getZKClientPort()).andReturn(port);
+    expect(configuration.getZKQuorum()).andReturn(quorum);
+
+    replay(configuration);
+  }
+
+  @Test(timeout = 150000)
+  public void testHAControllerDistributedAggregation() throws Exception {
+    TimelineMetricHAController haController = new TimelineMetricHAController(configuration);
+    haController.initializeHAController();
+    // Wait for task assignment
+    Thread.sleep(10000);
+
+    Assert.assertTrue(haController.isInitialized());
+    Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
+    Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
+    Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
+
+    // Add new instance
+    InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
+    haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
+    HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+      instanceConfig2.getInstanceName(),
+      InstanceType.PARTICIPANT, haController.zkConnectUrl);
+    manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
+      new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
+        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
+    manager2.connect();
+    haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Wait on re-assignment of partitions
+    Thread.sleep(10000);
+    Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
+
+    ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+
+    Map<String, String> partitionInstanceMap = new HashMap<>();
+
+    for (String partition : view.getPartitionSet()) {
+      Map<String, String> states = view.getStateMap(partition);
+      // (instance, state) pairs
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        partitionInstanceMap.put(partition, stateEntry.getKey());
+        Assert.assertEquals("ONLINE", stateEntry.getValue());
+      }
+    }
+    // Re-assigned partitions
+    Assert.assertEquals(2, partitionInstanceMap.size());
+
+    haController.getAggregationTaskRunner().stop();
+    haController.manager.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index eb7d750..d8b84ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.ambari.server.orm.entities.PermissionEntity;
 import org.apache.ambari.server.orm.entities.RoleAuthorizationEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.RepositoryType;
 import org.apache.ambari.server.state.State;
 import org.slf4j.Logger;
@@ -150,7 +152,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     dbAccessor.addColumn(CLUSTER_TABLE, new DBColumnInfo(CLUSTER_UPGRADE_ID_COLUMN, Long.class,
null, null, true));
 
     dbAccessor.addFKConstraint(CLUSTER_TABLE, "FK_clusters_upgrade_id",
-        CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
+      CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
   }
 
   @Override
@@ -165,6 +167,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     setRoleSortOrder();
     addSettingPermission();
     addManageUserPersistedDataPermission();
+    updateAMSConfigs();
   }
 
   private void createSettingTable() throws SQLException {
@@ -485,7 +488,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     dbAccessor.executeUpdate(String.format(updateStatement,
         6, PermissionEntity.CLUSTER_USER_PERMISSION_NAME));
     dbAccessor.executeUpdate(String.format(updateStatement,
-        7, PermissionEntity.VIEW_USER_PERMISSION_NAME));
+      7, PermissionEntity.VIEW_USER_PERMISSION_NAME));
   }
 
   /**
@@ -671,4 +674,30 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     dbAccessor.executeQuery("UPDATE " + HOST_ROLE_COMMAND_TABLE + " SET original_start_time=-1
WHERE original_start_time IS NULL");
     dbAccessor.setColumnNullable(HOST_ROLE_COMMAND_TABLE, columnName, false);
   }
+
+  protected void updateAMSConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = clusters.getClusters();
+
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+
+          Config amsEnv = cluster.getDesiredConfigByType("ams-env");
+
+          if (amsEnv != null) {
+            String content = amsEnv.getProperties().get("content");
+            if (content != null && !content.contains("AMS_INSTANCE_NAME")) {
+              String newContent = content + "\n # AMS instance name\n" +
+                "export AMS_INSTANCE_NAME={{hostname}}\n";
+
+              updateConfigurationProperties("ams-env", Collections.singletonMap("content",
newContent), true, true);
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index 836e159..c027939 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -80,6 +80,9 @@
     <value>
 # Set environment variables here.
 
+# AMS instance name
+export AMS_INSTANCE_NAME={{hostname}}
+
 # The java implementation to use. Java 1.6 required.
 export JAVA_HOME={{java64_home}}
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 59dbd84..f8131c0 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -29,7 +29,7 @@
           <name>METRICS_COLLECTOR</name>
           <displayName>Metrics Collector</displayName>
           <category>MASTER</category>
-          <cardinality>1</cardinality>
+          <cardinality>1+</cardinality>
           <versionAdvertised>false</versionAdvertised>
           <timelineAppid>AMS-HBASE</timelineAppid>
           <recovery_enabled>true</recovery_enabled>

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index ef2a96e..5cd3bdc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -282,6 +282,7 @@ public class UpgradeCatalog240Test {
     Method updateAlerts = UpgradeCatalog240.class.getDeclaredMethod("updateAlerts");
     Method addManageUserPersistedDataPermission = UpgradeCatalog240.class.getDeclaredMethod("addManageUserPersistedDataPermission");
     Method addSettingPermission = UpgradeCatalog240.class.getDeclaredMethod("addSettingPermission");
+    Method updateAmsConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateAMSConfigs");
 
     Capture<String> capturedStatements = newCapture(CaptureType.ALL);
 
@@ -293,6 +294,7 @@ public class UpgradeCatalog240Test {
             .addMockedMethod(updateAlerts)
             .addMockedMethod(addSettingPermission)
             .addMockedMethod(addManageUserPersistedDataPermission)
+            .addMockedMethod(updateAmsConfigs)
             .createMock();
 
     Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -302,6 +304,7 @@ public class UpgradeCatalog240Test {
     upgradeCatalog240.updateAlerts();
     upgradeCatalog240.addSettingPermission();
     upgradeCatalog240.addManageUserPersistedDataPermission();
+    upgradeCatalog240.updateAMSConfigs();
 
     replay(upgradeCatalog240, dbAccessor);
 


Mime
View raw message