ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject [1/2] ambari git commit: AMBARI-21458 Provide ability to shard Cluster second aggregation across appId. (dsen)
Date Thu, 31 Aug 2017 10:00:23 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-ams b0099d9a4 -> 8cad9eb1a


http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index bd508c4..f9ad773 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -213,9 +213,9 @@ public class TimelineMetricMetadataManager {
    */
   public void putIfModifiedHostedAppsMetadata(String hostname, String appId) {
     TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname);
-    Set<String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null;
+    ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null;
     if (apps == null) {
-      apps = new HashSet<>();
+      apps = new ConcurrentHashMap<>();
       if (timelineMetricHostMetadata == null) {
         HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps));
       } else {
@@ -223,8 +223,8 @@ public class TimelineMetricMetadataManager {
       }
     }
 
-    if (!apps.contains(appId)) {
-      apps.add(appId);
+    if (!apps.containsKey(appId)) {
+      apps.put(appId, appId);
       SYNC_HOSTED_APPS_METADATA.set(true);
     }
   }
@@ -362,8 +362,9 @@ public class TimelineMetricMetadataManager {
 
     String uuidStr = new String(uuid);
     if (uuidHostMap.containsKey(uuidStr)) {
+      //TODO fix the collisions
       LOG.error("Duplicate key computed for " + hostname +", Collides with  " + uuidHostMap.get(uuidStr));
-      return null;
+      return uuid;
     }
 
     if (timelineMetricHostMetadata == null) {
@@ -398,8 +399,15 @@ public class TimelineMetricMetadataManager {
     String uuidStr = new String(uuid);
     if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) {
       TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr);
+      //TODO fix the collisions
+      /**
+       * 2017-08-23 14:12:35,922 ERROR org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager:
+       * Duplicate key [52, 50, 51, 53, 50, 53, 53, 53, 49, 54, 57, 50, 50, 54, 0, 0]([B@278a93f9) computed for
+       * TimelineClusterMetric{metricName='sdisk_dm-11_write_count', appId='hbase', instanceId='', timestamp=1503497400000}, Collides with
+       * TimelineMetricMetadataKey{metricName='sdisk_dm-20_write_count', appId='hbase', instanceId=''}
+       */
       LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid +  ") computed for " + timelineClusterMetric.toString() + ", Collides with  " + collidingKey.toString());
-      return null;
+      return uuid;
     }
 
     if (timelineMetricMetadata == null) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
index f808cd7..96af877 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
@@ -134,7 +134,7 @@ public class TimelineMetricMetadataSync implements Runnable {
           // No persistence / stale data in store
           if (persistedData == null || persistedData.isEmpty() ||
             !persistedData.containsKey(cacheEntry.getKey()) ||
-            !persistedData.get(cacheEntry.getKey()).getHostedApps().containsAll(cacheEntry.getValue().getHostedApps())) {
+            !persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet())) {
             dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
index f35c23a..10e9c61 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
@@ -192,6 +192,10 @@ public class HashBasedUuidGenStrategy implements MetricUuidGenStrategy {
       }
     }
 
+    if (numericValue != 0) {
+      seed+=numericValue;
+    }
+
     String seedStr = String.valueOf(seed);
     if (seedStr.length() < maxLength) {
       return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index c60554c..57f9796 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -202,7 +202,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-        hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -242,7 +242,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
new file mode 100644
index 0000000..d3c6061
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricsIgniteCacheTest {
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+      Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr");
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)  h1
+
+    */
+    // Case 1 : data points are distributed equally, no values are lost, single host.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)   h1, h2
+
+    */
+    // Case 2 : data points are distributed equally, no values are lost, two hosts.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 5*seconds, 2.0);
+    metricValues.put(startTime + 15*seconds, 4.0);
+    metricValues.put(startTime + 25*seconds, 6.0);
+    metricValues.put(startTime + 35*seconds, 8.0);
+    metricValues.put(startTime + 45*seconds, 10.0);
+    metricValues.put(startTime + 55*seconds, 12.0);
+    TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1");
+    timelineMetric2.setMetricValues(metricValues);
+
+    timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetrics.add(timelineMetric2);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    /*
+
+    0      +30s    +60s    +90s
+    |       |       |       |
+     (1)      (2)                h1
+                (3)       (4)    h2
+                 (5)      (6)    h1
+
+    */
+    // Case 3 : merging host data points, ignore (2) for h1 as it will conflict with (5), two hosts.
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 45*seconds, 3.0);
+    metricValues.put(startTime + 85*seconds, 4.0);
+    timelineMetric = new TimelineMetric("metric1", "host2", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 55*seconds, 5.0);
+    metricValues.put(startTime + 85*seconds, 6.0);
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+
+    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 3);
+    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(1.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(1, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(8.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 3*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(10.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+
+  @Test
+  public void updateAppAggregatesFromHostMetricTest() {
+    //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds"
+
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+    TimelineMetric timelineMetric;
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 55*seconds, 2.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 45*seconds, 3.0);
+    metricValues.put(startTime + 85*seconds, 4.0);
+    timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 5.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 6.0);
+    timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+    Assert.assertEquals(aggregateMap.size(), 6);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+        timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("app_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("host_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index ea947d0..b4d0f0a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import static junit.framework.Assert.assertEquals;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
 
 public class AbstractTimelineAggregatorTest {
 
@@ -114,7 +116,7 @@ public class AbstractTimelineAggregatorTest {
   public void testDoWorkOnZeroDelay() throws Exception {
 
     long currentTime = System.currentTimeMillis();
-    long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
+    long roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
 
     //Test first run of aggregator with no checkpoint
@@ -138,7 +140,7 @@ public class AbstractTimelineAggregatorTest {
     currentTime = System.currentTimeMillis();
     checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
     agg.runOnce(sleepIntervalMillis);
-    long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis);
+    long checkPointTime = getRoundedAggregateTimeMillis(sleepIntervalMillis);
     assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", checkPointTime, endTimeInDoWork.get());
     assertEquals(roundedOffAggregatorTime, checkPoint.get());
@@ -147,10 +149,10 @@ public class AbstractTimelineAggregatorTest {
 
 //    //Test first run with perfect checkpoint (sleepIntervalMillis back)
     currentTime = System.currentTimeMillis();
-    roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
+    roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
     checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
-    long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+    long expectedCheckPoint = getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
     checkPoint.set(checkPointTime);
     agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
@@ -165,7 +167,7 @@ public class AbstractTimelineAggregatorTest {
     currentTime = System.currentTimeMillis();
     checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
     agg.runOnce(sleepIntervalMillis);
-    long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
+    long expectedStartTime = getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
       expectedStartTime, startTimeInDoWork.get());
     assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index a9f2b4d..1c5f41f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -69,7 +69,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -121,7 +121,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -196,7 +196,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -479,7 +479,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -529,7 +529,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 937dd80..eb38625 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 
 import java.sql.ResultSet;
@@ -51,23 +53,16 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long sliceInterval = 30000l;
     long metricInterval = 10000l;
 
-    Configuration configuration = new Configuration();
     TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
     expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
     replay(metricMetadataManagerMock);
 
-    TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
-      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
-      configuration, null, aggregatorInterval, 2, "false", "", "",
-      aggregatorInterval, sliceInterval, null);
-
-    secondAggregator.timeSliceIntervalMillis = sliceInterval;
-    long roundedEndTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(aggregatorInterval);
+    long roundedEndTime = getRoundedAggregateTimeMillis(aggregatorInterval);
     long roundedStartTime = roundedEndTime - aggregatorInterval;
-    List<Long[]> timeSlices = secondAggregator.getTimeSlices(roundedStartTime ,
-      roundedEndTime);
+    List<Long[]> timeSlices = getTimeSlices(roundedStartTime ,
+      roundedEndTime, sliceInterval);
 
-    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
 
     long startTime = roundedEndTime - aggregatorInterval;
 
@@ -85,7 +80,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     counterMetric.setMetricValues(metricValues);
     counterMetric.setType("COUNTER");
 
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap = secondAggregator.sliceFromTimelineMetric(counterMetric, timeSlices);
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap = sliceFromTimelineMetric(counterMetric, timeSlices, true);
 
     TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(counterMetric.getMetricName(), counterMetric.getAppId(),
       counterMetric.getInstanceId(), 0l);
@@ -104,7 +99,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     metric.setAppId("TestAppId");
     metric.setMetricValues(metricValues);
 
-    timelineClusterMetricMap = secondAggregator.sliceFromTimelineMetric(metric, timeSlices);
+    timelineClusterMetricMap = sliceFromTimelineMetric(metric, timeSlices, true);
 
     timelineClusterMetric = new TimelineClusterMetric(metric.getMetricName(), metric.getAppId(),
       metric.getInstanceId(), 0l);
@@ -116,7 +111,6 @@ public class TimelineMetricClusterAggregatorSecondTest {
     timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
     Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
     Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 7.5);
-
   }
 
   @Test
@@ -137,8 +131,8 @@ public class TimelineMetricClusterAggregatorSecondTest {
       aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval, null
     );
 
-    long startTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
-    List<Long[]> timeslices = secondAggregator.getTimeSlices(startTime, startTime + aggregatorInterval);
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + aggregatorInterval, sliceInterval);
 
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
     long seconds = 1000;
@@ -367,7 +361,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long now = System.currentTimeMillis();
     long startTime = now - 120000;
     long seconds = 1000;
-    List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now);
+    List<Long[]> slices = getTimeSlices(startTime, now, sliceInterval);
     ResultSet rs = createNiceMock(ResultSet.class);
 
     TreeMap<Long, Double> metricValues = new TreeMap<>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
new file mode 100644
index 0000000..7cddb00
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsIgniteCache;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
+
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+        Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+        sliceInterval, null, timelineMetricsIgniteCache, 30L);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>();
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 15 * seconds),
+        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m2", "a2", "i1",startTime + 18 * seconds),
+        new MetricClusterAggregate(1.0, 5, 1.0, 1.0, 1.0));
+
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(5d, a2.getSum());
+  }
+
+  @Test
+  public void testSlicesRecalculation() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+        sliceInterval, null, timelineMetricsIgniteCache, 30L);
+
+    long seconds = 1000;
+    long now = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), 120*seconds);
+    long startTime = now - 120*seconds;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>();
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 5 * seconds),
+        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 25 * seconds),
+        new MetricClusterAggregate(2.0, 2, 1.0, 2.0, 2.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 45 * seconds),
+        new MetricClusterAggregate(3.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 65 * seconds),
+        new MetricClusterAggregate(4.0, 2, 1.0, 4.0, 4.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 85 * seconds),
+        new MetricClusterAggregate(5.0, 2, 1.0, 5.0, 5.0));
+
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices);
+
+    Assert.assertNotNull(aggregates);
+    Assert.assertEquals(4, aggregates.size());
+
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("m1", "a1", "i1", startTime + 30*seconds);
+    MetricClusterAggregate metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(1.5, metricClusterAggregate.getSum());
+    Assert.assertEquals(1d, metricClusterAggregate.getMin());
+    Assert.assertEquals(2d, metricClusterAggregate.getMax());
+    Assert.assertEquals(2, metricClusterAggregate.getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 60*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(3d, metricClusterAggregate.getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 90*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(4.5d, metricClusterAggregate.getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("live_hosts", "a1", null, startTime + 120*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(2d, metricClusterAggregate.getSum());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
index a0bc77f..e14d069 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
@@ -52,6 +52,7 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes
 
     expect(configuration.getClusterZKClientPort()).andReturn(port);
     expect(configuration.getClusterZKQuorum()).andReturn(quorum);
+    expect(configuration.getZkConnectionUrl(port, quorum)).andReturn(quorum + ":" + port);
 
     replay(configuration);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index f9a1036..3e3b91f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -120,10 +120,10 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
     Map<String, TimelineMetricHostMetadata> cachedHostData = metadataManager.getHostedAppsCache();
     Map<String, TimelineMetricHostMetadata> savedHostData = metadataManager.getHostedAppsFromStore();
     Assert.assertEquals(cachedData.size(), savedData.size());
-    Assert.assertEquals("dummy_app1", cachedHostData.get("dummy_host1").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app2", cachedHostData.get("dummy_host2").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").getHostedApps().iterator().next());
+    Assert.assertEquals("dummy_app1", cachedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", cachedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
 
     Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache();
     Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore();

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cad9eb1/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 3864526..5e57755 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -182,8 +182,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>


Mime
View raw message