ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avija...@apache.org
Subject [07/28] ambari git commit: AMBARI-22740 : Fix integration test for HBase in branch-3.0-ams due to UUID changes. (avijayan)
Date Mon, 08 Jan 2018 18:19:33 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
new file mode 100644
index 0000000..de0236c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExitUtil.class)
+public class TimelineMetricStoreWatcherTest {
+
+  @Test
+  public void testRunPositive() throws Exception {
+    TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+
+    expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
+      .andReturn(new TimelinePutResponse());
+
+    // metric found
+    expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
+      EasyMock.<List<String>>anyObject(), anyObject(String.class),
+      anyObject(String.class), anyObject(Long.class), anyObject(Long.class),
+      eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString()))
+      .andReturn(null).anyTimes();
+
+    mockStatic(ExitUtil.class);
+
+    replay(metricStore);
+
+    TimelineMetricStoreWatcher timelineMetricStoreWatcher =
+      new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
+    timelineMetricStoreWatcher.run();
+    timelineMetricStoreWatcher.run();
+    timelineMetricStoreWatcher.run();
+    verify(metricStore);
+
+  }
+
+  @Test
+  public void testRunNegative() throws Exception {
+    TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class);
+
+    expect(metricStore.putMetrics(anyObject(TimelineMetrics.class)))
+      .andReturn(new TimelinePutResponse());
+
+    // no metrics found
+    expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
+      EasyMock.<List<String>>anyObject(), anyObject(String.class),
+      anyObject(String.class), anyObject(Long.class), anyObject(Long.class),
+      eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString()))
+      .andReturn(null).anyTimes();
+
+    String msg = "Error getting metrics from TimelineMetricStore. " +
+      "Shutting down by TimelineMetricStoreWatcher.";
+    mockStatic(ExitUtil.class);
+    ExitUtil.terminate(-1, msg);
+    expectLastCall().anyTimes();
+
+    replayAll();
+
+    TimelineMetricStoreWatcher timelineMetricStoreWatcher =
+      new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
+    timelineMetricStoreWatcher.run();
+    timelineMetricStoreWatcher.run();
+    timelineMetricStoreWatcher.run();
+
+    verifyAll();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java
new file mode 100644
index 0000000..11a01d9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Aggregator Memory sink implementation to perform test
+ */
+public class TimelineMetricsAggregatorMemorySink
+    implements TimelineMetricsAggregatorSink {
+
+  private static Map<Precision, Map<TimelineMetric, MetricHostAggregate>> hostAggregateRecords =
+      new HashMap<>();
+  private static Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> clusterTimeAggregateRecords =
+      new HashMap<>();
+  private static Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateRecords =
+      new HashMap<>();
+
+  @Override
+  public void saveHostAggregateRecords(
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+      Precision precision) {
+    if (hostAggregateMap == null || hostAggregateMap.size() == 0) {
+      return;
+    }
+
+    Map<TimelineMetric, MetricHostAggregate> aggregatedValue = null;
+    if (hostAggregateRecords.containsKey(precision)) {
+      aggregatedValue = hostAggregateRecords.get(precision);
+    } else {
+      aggregatedValue = new HashMap<>();
+      hostAggregateRecords.put(precision, aggregatedValue);
+    }
+
+    for (Entry<TimelineMetric, MetricHostAggregate> entry : hostAggregateMap
+        .entrySet()) {
+      TimelineMetric timelineMetricClone = new TimelineMetric(entry.getKey());
+      MetricHostAggregate hostAggregate = entry.getValue();
+      MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+          hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+          hostAggregate.getDeviation(), hostAggregate.getMax(),
+          hostAggregate.getMin());
+      aggregatedValue.put(timelineMetricClone, hostAggregateClone);
+    }
+  }
+
+  @Override
+  public void saveClusterTimeAggregateRecords(
+      Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+      Precision precision) {
+    if (clusterTimeAggregateMap == null
+        || clusterTimeAggregateMap.size() == 0) {
+      return;
+    }
+
+    Map<TimelineClusterMetric, MetricHostAggregate> aggregatedValue = null;
+    if (clusterTimeAggregateRecords.containsKey(precision)) {
+      aggregatedValue = clusterTimeAggregateRecords.get(precision);
+    } else {
+      aggregatedValue = new HashMap<>();
+      clusterTimeAggregateRecords.put(precision, aggregatedValue);
+    }
+
+    for (Entry<TimelineClusterMetric, MetricHostAggregate> entry : clusterTimeAggregateMap
+        .entrySet()) {
+      TimelineClusterMetric clusterMetric = entry.getKey();
+      TimelineClusterMetric clusterMetricClone =
+          new TimelineClusterMetric(clusterMetric.getMetricName(),
+              clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+              clusterMetric.getTimestamp());
+      MetricHostAggregate hostAggregate = entry.getValue();
+      MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+          hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+          hostAggregate.getDeviation(), hostAggregate.getMax(),
+          hostAggregate.getMin());
+      aggregatedValue.put(clusterMetricClone, hostAggregateClone);
+    }
+  }
+
+  @Override
+  public void saveClusterAggregateRecords(
+      Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMaps) {
+
+    if (clusterAggregateMaps == null || clusterAggregateMaps.size() == 0) {
+      return;
+    }
+
+    for (Entry<TimelineClusterMetric, MetricClusterAggregate> entry : clusterAggregateMaps
+        .entrySet()) {
+      TimelineClusterMetric clusterMetric = entry.getKey();
+      TimelineClusterMetric clusterMetricClone =
+          new TimelineClusterMetric(clusterMetric.getMetricName(),
+              clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+              clusterMetric.getTimestamp());
+      MetricClusterAggregate clusterAggregate = entry.getValue();
+      MetricClusterAggregate clusterAggregateClone = new MetricClusterAggregate(
+          clusterAggregate.getSum(), (int) clusterAggregate.getNumberOfHosts(),
+          clusterAggregate.getDeviation(), clusterAggregate.getMax(),
+          clusterAggregate.getMin());
+      clusterAggregateRecords.put(clusterMetricClone, clusterAggregateClone);
+    }
+  }
+
+  public Map<Precision, Map<TimelineMetric, MetricHostAggregate>> getHostAggregateRecords() {
+    return Collections.unmodifiableMap(hostAggregateRecords);
+  }
+
+  public Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> getClusterTimeAggregateRecords() {
+    return Collections.unmodifiableMap(clusterTimeAggregateRecords);
+  }
+
+  public Map<TimelineClusterMetric, MetricClusterAggregate> getClusterAggregateRecords() {
+    return Collections.unmodifiableMap(clusterAggregateRecords);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java
new file mode 100644
index 0000000..d64bf7c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TimelineMetricsFilterTest {
+
+  @Test
+  public void testAppBlacklisting() throws Exception{
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus");
+    TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+    replay(configuration);
+
+    TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+
+    timelineMetric.setAppId("hbase");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setAppId("namenode");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setAppId("nimbus");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+  }
+
+  @Test
+  public void testMetricWhitelisting() throws Exception {
+
+    Configuration metricsConf = new Configuration();
+    TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+    expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
+    replay(configuration);
+
+    URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat");
+
+    metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath());
+    TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+
+    timelineMetric.setMetricName("cpu_system");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("cpu_system1");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+  }
+
+  @Test
+  public void testTogether() throws Exception {
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus");
+    TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+    replay(configuration);
+
+    URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat");
+    metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath());
+
+    TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+
+    timelineMetric.setMetricName("cpu_system");
+    timelineMetric.setAppId("hbase");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("cpu_system");
+    timelineMetric.setAppId("HOST");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+  }
+
+  @Test
+  public void testAmshbaseWhitelisting() throws Exception {
+
+    TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+
+    Configuration metricsConf = new Configuration();
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+
+    Set<String> whitelist = new HashSet();
+    whitelist.add("regionserver.Server.Delete_99th_percentile");
+    whitelist.add("regionserver.Server.Delete_max");
+    whitelist.add("regionserver.Server.Delete_mean");
+    expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once();
+
+    replay(configuration);
+
+    TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+
+    timelineMetric.setMetricName("regionserver.Server.Delete_max");
+    timelineMetric.setAppId("ams-hbase");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("regionserver.Server.Delete_min3333");
+    timelineMetric.setAppId("ams-hbase");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM");
+    timelineMetric.setAppId("hbase");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+  }
+
+  @Test
+  public void testHybridFilter() throws Exception {
+
+    // Whitelist Apps - namenode, nimbus
+    // Blacklist Apps - datanode, kafka_broker
+    // Accept ams-hbase whitelisting.
+    // Reject non whitelisted metrics from non whitelisted Apps (Say hbase)
+
+    TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.set("timeline.metrics.apps.whitelist", "namenode,nimbus");
+    metricsConf.set("timeline.metrics.apps.blacklist", "datanode,kafka_broker");
+    URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat");
+    metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath());
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+
+    Set<String> whitelist = new HashSet();
+    whitelist.add("regionserver.Server.Delete_99th_percentile");
+    whitelist.add("regionserver.Server.Delete_max");
+    whitelist.add("regionserver.Server.Delete_mean");
+    expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once();
+
+    expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
+
+    replay(configuration);
+
+    TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+
+    //Test App Whitelisting
+    timelineMetric.setMetricName("metric.a.b.c");
+    timelineMetric.setAppId("namenode");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("metric.d.e.f");
+    timelineMetric.setAppId("nimbus");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    //Test App Blacklisting
+    timelineMetric.setMetricName("metric.d.e.f");
+    timelineMetric.setAppId("datanode");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("metric.d.e.f");
+    timelineMetric.setAppId("kafka_broker");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+
+    //Test ams-hbase Whitelisting
+    timelineMetric.setMetricName("regionserver.Server.Delete_max");
+    timelineMetric.setAppId("ams-hbase");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("regionserver.Server.Delete_min3333");
+    timelineMetric.setAppId("ams-hbase");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("regionserver.Server.Delete_mean");
+    timelineMetric.setAppId("ams-hbase");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    //Test Metric Whitelisting
+    timelineMetric.setMetricName("regionserver.WAL.SyncTime_max");
+    timelineMetric.setAppId("hbase");
+    Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric));
+
+    timelineMetric.setMetricName("regionserver.WAL.metric.not.needed");
+    timelineMetric.setAppId("hbase");
+    Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
new file mode 100644
index 0000000..5b47545
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricsIgniteCacheTest {
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+      Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr");
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)  h1
+
+    */
+    // Case 1 : data points are distributed equally, no values are lost, single host.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)   h1, h2
+
+    */
+    // Case 2 : data points are distributed equally, no values are lost, two hosts.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 5*seconds, 2.0);
+    metricValues.put(startTime + 15*seconds, 4.0);
+    metricValues.put(startTime + 25*seconds, 6.0);
+    metricValues.put(startTime + 35*seconds, 8.0);
+    metricValues.put(startTime + 45*seconds, 10.0);
+    metricValues.put(startTime + 55*seconds, 12.0);
+    TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1");
+    timelineMetric2.setMetricValues(metricValues);
+
+    timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetrics.add(timelineMetric2);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+
+  @Test
+  public void updateAppAggregatesFromHostMetricTest() {
+    //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds"
+
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+    TimelineMetric timelineMetric;
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 55*seconds, 2.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 45*seconds, 3.0);
+    metricValues.put(startTime + 85*seconds, 4.0);
+    timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 5.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 6.0);
+    timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+    Assert.assertEquals(aggregateMap.size(), 6);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+        timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("app_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("host_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java
new file mode 100644
index 0000000..5f1c470
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import junit.framework.Assert;
+import org.apache.ambari.metrics.core.timeline.query.TopNCondition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TopNConditionTest {
+
+
+  @Test
+  public void testTopNCondition() {
+
+    List<String> metricNames = new ArrayList<>();
+    List<String> hostnames = new ArrayList<>();
+
+    //Valid Cases
+
+    // "H" hosts and 1 Metric
+    hostnames.add("h1");
+    hostnames.add("h2");
+    metricNames.add("m1");
+    Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+    hostnames.clear();
+
+    // Host(s) with wildcard & 1 Metric
+    hostnames.add("h%");
+    hostnames.add("g1");
+    Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+    hostnames.clear();
+
+    // M Metrics and No host
+    metricNames.add("m2");
+    metricNames.add("m3");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+    // M Metrics with wildcard and No host
+    metricNames.add("m2");
+    metricNames.add("m%");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+    // M Metrics with wildcard and 1 host
+    metricNames.add("m2");
+    metricNames.add("m%");
+    hostnames.add("h1");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+    metricNames.clear();
+    hostnames.clear();
+
+    //Invalid Cases
+    // M metrics and H hosts
+    metricNames.add("m1");
+    metricNames.add("m2");
+    hostnames.add("h1");
+    hostnames.add("h2");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+    metricNames.clear();
+    hostnames.clear();
+
+    // Wildcard in 1 and multiple in other
+    metricNames.add("m1");
+    metricNames.add("m2");
+    hostnames.add("%");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+    metricNames.clear();
+    hostnames.clear();
+
+    //Wildcard in both
+    metricNames.add("m%");
+    hostnames.add("%");
+    Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames));
+    Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java
new file mode 100644
index 0000000..d67ec7f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+
+public class AbstractTimelineAggregatorTest {
+
+  private AbstractTimelineAggregator agg;
+
+  AtomicLong startTimeInDoWork;
+  AtomicLong endTimeInDoWork;
+  AtomicLong checkPoint;
+  int actualRuns;
+
+  long sleepIntervalMillis;
+  int checkpointCutOffMultiplier;
+
+  @Before
+  public void setUp() throws Exception {
+    sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes
+    checkpointCutOffMultiplier = 2;
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0);
+    metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000);
+
+    startTimeInDoWork = new AtomicLong(0);
+    endTimeInDoWork = new AtomicLong(0);
+    checkPoint = new AtomicLong(-1);
+    actualRuns = 0;
+
+    agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND, null, metricsConf) {
+      @Override
+      public boolean doWork(long startTime, long endTime) {
+        startTimeInDoWork.set(startTime);
+        endTimeInDoWork.set(endTime);
+        actualRuns++;
+
+        return true;
+      }
+
+      @Override
+      protected Condition
+      prepareMetricQueryCondition(long startTime, long endTime) {
+        return null;
+      }
+
+      @Override
+      protected void aggregate(ResultSet rs, long startTime,
+                               long endTime) throws IOException, SQLException {
+      }
+
+      @Override
+      public Long getSleepIntervalMillis() {
+        return sleepIntervalMillis;
+      }
+
+      @Override
+      protected Integer getCheckpointCutOffMultiplier() {
+        return checkpointCutOffMultiplier;
+      }
+
+      @Override
+      public boolean isDisabled() {
+        return false;
+      }
+
+      @Override
+      protected String getCheckpointLocation() {
+        return "dummy_ckptFile";
+      }
+
+      protected long readCheckPoint() {
+        return checkPoint.get();
+      }
+
+      @Override
+      protected void saveCheckPoint(long checkpointTime) throws IOException {
+        checkPoint.set(checkpointTime);
+      }
+    };
+
+  }
+
+  @Test
+  public void testDoWorkOnZeroDelay() throws Exception {
+
+    long currentTime = System.currentTimeMillis();
+    long roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
+      sleepIntervalMillis);
+
+    //Test first run of aggregator with no checkpoint
+    checkPoint.set(-1);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals(roundedOffAggregatorTime, checkPoint.get());
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
+
+//    //Test first run with too "recent" checkpoint
+    currentTime = System.currentTimeMillis();
+    checkPoint.set(currentTime);
+    agg.setSleepIntervalMillis(sleepIntervalMillis);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
+
+    //Test first run with Too Old checkpoint
+    currentTime = System.currentTimeMillis();
+    checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
+    agg.runOnce(sleepIntervalMillis);
+    long checkPointTime = getRoundedAggregateTimeMillis(sleepIntervalMillis);
+    assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", checkPointTime, endTimeInDoWork.get());
+    assertEquals(roundedOffAggregatorTime, checkPoint.get());
+    assertEquals("Do not aggregate on first run", 1, actualRuns);
+
+
+//    //Test first run with perfect checkpoint (sleepIntervalMillis back)
+    currentTime = System.currentTimeMillis();
+    roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
+      sleepIntervalMillis);
+    checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
+    long expectedCheckPoint = getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+    checkPoint.set(checkPointTime);
+    agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should the lower rounded time of the checkpoint time",
+      expectedCheckPoint, startTimeInDoWork.get());
+    assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+      expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
+    assertEquals(expectedCheckPoint + sleepIntervalMillis,
+      checkPoint.get());
+    assertEquals("Aggregate on first run", 2, actualRuns);
+
+    //Test edge case for checkpoint (2 x sleepIntervalMillis)
+    currentTime = System.currentTimeMillis();
+    checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
+    agg.runOnce(sleepIntervalMillis);
+    long expectedStartTime = getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
+    assertEquals("startTime should the lower rounded time of the checkpoint time",
+      expectedStartTime, startTimeInDoWork.get());
+    assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+      expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get());
+    assertEquals(expectedStartTime + sleepIntervalMillis,
+      checkPoint.get());
+    assertEquals("Aggregate on second run", 3, actualRuns);
+
+
+ }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java
new file mode 100644
index 0000000..f216b13
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class DownSamplerTest {
+
+  @Test
+  public void testGetDownsampleMetricPatterns() throws Exception {
+
+    Configuration configuration = new Configuration();
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2");
+    configuration.setIfUnset("timeline.metrics.downsampler.lastvalue.metric.patterns", "pattern3");
+
+    List<String> patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration);
+    Assert.assertEquals(patterns.size(), 3);
+    Assert.assertTrue(patterns.contains("pattern1"));
+    Assert.assertTrue(patterns.contains("pattern2"));
+    Assert.assertTrue(patterns.contains("pattern3"));
+
+    Configuration configuration2 = new Configuration();
+    patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration2);
+    Assert.assertEquals(patterns.size(), 0);
+  }
+
+  @Test
+  public void testGetDownSamplers() throws Exception {
+
+    Configuration configuration = new Configuration();
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2");
+    configuration.setIfUnset("timeline.metrics.downsampler.test.metric.patterns", "pattern3");
+
+    List<CustomDownSampler> downSamplers = DownSamplerUtils.getDownSamplers(configuration);
+    Assert.assertEquals(downSamplers.size(), 1);
+    Assert.assertTrue(downSamplers.get(0) instanceof TopNDownSampler);
+  }
+
+  @Ignore
+  @Test
+  public void testPrepareTopNDownSamplingStatement() throws Exception {
+    Configuration configuration = new Configuration();
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2");
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "3");
+
+    Map<String, String> conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix);
+
+    TopNDownSampler topNDownSampler = TopNDownSampler.fromConfig(conf);
+    List<String> stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_RECORD");
+    Assert.assertEquals(stmts.size(),2);
+    Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " +
+      "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " +
+      "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " +
+      "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3"));
+
+    Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " +
+      "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " +
+      "METRIC_NAME LIKE 'pattern2' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " +
+      "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3"));
+
+    configuration.clear();
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1");
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "4");
+    configuration.setIfUnset("timeline.metrics.downsampler.topn.function", "sum");
+    conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix);
+    topNDownSampler = TopNDownSampler.fromConfig(conf);
+    stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_AGGREGATE_MINUTE");
+    Assert.assertEquals(stmts.size(),1);
+
+    Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " +
+      "SUM(METRIC_SUM), 1, SUM(METRIC_SUM), SUM(METRIC_SUM) FROM METRIC_AGGREGATE_MINUTE WHERE " +
+      "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " +
+      "GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY SUM(METRIC_SUM) DESC LIMIT 4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
new file mode 100644
index 0000000..e01f3b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.ambari.metrics.core.timeline.MetricTestHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
+  private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(metadataManager, false);
+
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
+  @Test
+  public void testShouldAggregateClusterProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        getConfigurationForTest(false), metadataManager, null, null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    ctime += 2*minute;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+
+    // WHEN
+    long endTime = ctime + minute + 1;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        metricReader.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
+  @Test
+  public void testShouldAggregateClusterIgnoringInstance() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        getConfigurationForTest(false), metadataManager, null, null);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000 * 2;
+
+    /**
+     * Here we have two nodes with two instances each:
+     *              | local1 | local2 |
+     *  instance i1 |   1    |   2    |
+     *  instance i2 |   3    |   4    |
+     *
+     */
+    // Four 1's at ctime - 100
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1",
+      "i1", "disk_free", 1), true);
+    // Four 2's at ctime - 100: different host
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2",
+      "i1", "disk_free", 2), true);
+    // Avoid overwrite
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1",
+      "i2", "disk_free", 3), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2",
+      "i2", "disk_free", 4), true);
+
+    ctime += minute;
+
+    // Four 1's at ctime + 2 min
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1",
+      "i1", "disk_free", 1), true);
+    // Four 1's at ctime + 2 min - different host
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2",
+      "i1", "disk_free", 3), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1",
+      "i2", "disk_free", 2), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2",
+      "i2", "disk_free", 4), true);
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime - 1000, endTime + 1000);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
+        recordCount++;
+      } else {
+        if (!currentMetric.getMetricName().equals("live_hosts")) {
+          fail("Unexpected entry");
+        }
+      }
+    }
+
+    Assert.assertEquals(6, recordCount); //Interpolation adds 1 record.
+  }
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        getConfigurationForTest(false), metadataManager, null, null);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
+
+    // here we put some metrics tha will be aggregated
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1), true);
+
+    ctime += 2*minute;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1), true);
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1), true);
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        if (!currentMetric.getMetricName().equals("live_hosts")) {
+          fail("Unexpected entry");
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAggregateDailyClusterMetrics() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(
+        false),
+        metadataManager,
+        null);
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long hour = 3600 * 1000;
+
+    Map<TimelineClusterMetric, MetricHostAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+
+
+    hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+    // WHEN
+    agg.doWork(startTime, ctime + hour + 1000);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY");
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
+      assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
+      assertEquals("APP_ID", "test_app", metric.getAppId());
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("Day aggregated row expected ", 1, count);
+  }
+
+  @Test
+  public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
+
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+        hdb,
+        getConfigurationForTest(false),
+        metadataManager,
+        null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long second = 1000;
+    long minute = 60*second;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(startTime, ctime + second);
+    long oldCtime = ctime + second;
+
+    //Next minute
+    ctime = startTime + minute;
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(oldCtime, ctime + second);
+
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
+    int count = 0;
+    long diff = 0 ;
+    while (rs.next()) {
+      TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
+      assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
+      assertEquals("APP_ID", "test_app", metric.getAppId());
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      if (count == 0) {
+        diff+=rs.getLong("SERVER_TIME");
+      } else {
+        diff-=rs.getLong("SERVER_TIME");
+        if (diff < 0) {
+          diff*=-1;
+        }
+        assertTrue(diff == minute);
+      }
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 2, count);
+  }
+
+  @Test
+  public void testShouldAggregateClusterOnHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+        hdb,
+        getConfigurationForTest(false),
+        metadataManager,
+        null);
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
+      assertEquals("METRIC_NAME", "disk_used", metric.getMetricName());
+      assertEquals("APP_ID", "test_app", metric.getAppId());
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 1, count);
+  }
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+        hdb,
+        getConfigurationForTest(false),
+        metadataManager,
+        null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
+      if ("disk_used".equals(metric.getMetricName())) {
+        assertEquals("APP_ID", "test_app", metric.getAppId());
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(metric.getMetricName())) {
+        assertEquals("APP_ID", "test_app", metric.getAppId());
+        assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+
+      count++;
+    }
+
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
+  @Test
+  public void testAppLevelHostMetricAggregates() throws Exception {
+    Configuration conf = getConfigurationForTest(false);
+    conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+        hdb,
+        conf,
+        metadataManager,
+        null,
+        null);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric((ctime), "local1",
+      "app1", null, "app_metric_random", 1), true);
+    ctime += 10;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "cpu_user", 1), true);
+    ctime += 10;
+    hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
+      "cpu_user", 2), true);
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("cpu_user"); }},
+      null,
+      "app1", null);
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null,
+      startTime - 90000, endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    TimelineClusterMetric currentMetric = null;
+    MetricClusterAggregate currentHostAggregate = null;
+    while (rs.next()) {
+      currentMetric = readHelper.fromResultSet(rs);
+      currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
+      recordCount++;
+    }
+    assertEquals(3, recordCount);
+    assertNotNull(currentMetric);
+    assertEquals("cpu_user", currentMetric.getMetricName());
+    assertEquals("app1", currentMetric.getAppId());
+    assertNotNull(currentHostAggregate);
+    assertEquals(1, currentHostAggregate.getNumberOfHosts());
+    assertEquals(1.0d, currentHostAggregate.getSum());
+  }
+
+  @Test
+  public void testClusterAggregateMetricNormalization() throws Exception {
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+        hdb,
+        getConfigurationForTest(false),
+        metadataManager,
+        null,
+        null);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false);
+
+    long currentTime = System.currentTimeMillis();
+    // Sample data
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric1.setAppId("resourcemanager");
+    metric1.setHostName("h1");
+    metric1.setStartTime(currentTime);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(currentTime + 10000, 1.0);
+      put(currentTime + 20000, 1.0);
+      put(currentTime + 30000, 1.0);
+      put(currentTime + 40000, 1.0);
+      put(currentTime + 50000, 1.0);
+      put(currentTime + 60000, 1.0);
+      put(currentTime + 70000, 1.0);
+    }});
+
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric2.setAppId("resourcemanager");
+    metric2.setHostName("h1");
+    metric2.setStartTime(currentTime + 70000);
+    metric2.setMetricValues(new TreeMap<Long, Double>() {{
+      put(currentTime + 70000, 1.0);
+      put(currentTime + 80000, 1.0);
+      put(currentTime + 90000, 1.0);
+      put(currentTime + 100000, 1.0);
+      put(currentTime + 110000, 1.0);
+      put(currentTime + 120000, 1.0);
+      put(currentTime + 130000, 1.0);
+    }});
+
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Collections.singletonList(metric1));
+    insertMetricRecords(conn, metrics);
+
+    metrics.setMetrics(Collections.singletonList(metric2));
+    insertMetricRecords(conn, metrics);
+
+    long startTime = currentTime - 3*60*1000;
+    long endTime = currentTime + 3*60*1000;
+
+    agg.doWork(startTime, endTime);
+
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        if (!currentMetric.getMetricName().equals("live_hosts")) {
+          fail("Unexpected entry");
+        }
+      }
+    }
+    Assert.assertEquals(10, recordCount); //With interpolation.
+  }
+
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+        hdb,
+        getConfigurationForTest(true),
+        metadataManager,
+        null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID"));
+      if ("disk_used".equals(metric.getMetricName())) {
+        assertEquals("APP_ID", "test_app", metric.getAppId());
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(metric.getMetricName())) {
+        assertEquals("APP_ID", "test_app", metric.getAppId());
+        assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+      count++;
+    }
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
+  private ResultSet executeQuery(String query) throws SQLException {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+    return stmt.executeQuery(query);
+  }
+}


Mime
View raw message