ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From swa...@apache.org
Subject [1/3] ambari git commit: AMBARI-12248. Metrics API result set contains a lot of data outside the requested time range. (swagle)
Date Tue, 07 Jul 2015 00:22:22 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 9b991a4a6 -> b234664cd


AMBARI-12248. Metrics API result set contains a lot of data outside the requested time range. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7763a47e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7763a47e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7763a47e

Branch: refs/heads/trunk
Commit: 7763a47eb1a7f23aa015de490fd04f976cdce6c8
Parents: 9b991a4
Author: Siddharth Wagle <swagle@hortonworks.com>
Authored: Wed Jul 1 17:48:56 2015 -0700
Committer: Siddharth Wagle <swagle@hortonworks.com>
Committed: Mon Jul 6 16:49:39 2015 -0700

----------------------------------------------------------------------
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  30 ++++-
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../timeline/query/PhoenixTransactSQL.java      |  72 +++++------
 .../timeline/AbstractMiniHBaseClusterTest.java  |   3 +
 .../internal/AbstractPropertyProvider.java      |  12 +-
 .../controller/internal/TemporalInfoImpl.java   |  30 ++++-
 .../metrics/MetricReportingAdapter.java         |   5 +-
 .../metrics/MetricsDownsamplingMethod.java      |  28 +++-
 .../MetricsDownsamplingMethodFactory.java       |  49 ++++++-
 .../metrics/timeline/AMSPropertyProvider.java   |   9 +-
 .../timeline/AMSReportPropertyProvider.java     |   2 +-
 .../server/controller/spi/TemporalInfo.java     |  14 ++
 .../metrics/ganglia/TestStreamProvider.java     |  13 +-
 .../timeline/AMSPropertyProviderTest.java       | 127 +++++++++++++------
 .../timeline/AMSReportPropertyProviderTest.java |  14 +-
 .../timeline/MetricsPaddingMethodTest.java      |  10 ++
 .../ams/single_host_component_metrics.json      |  14 +-
 17 files changed, 316 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index d018f29..1bd20a3 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -51,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -63,6 +65,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
@@ -87,23 +90,28 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  * Provides a facade over the Phoenix API to access HBase schema
  */
 public class PhoenixHBaseAccessor {
+  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
 
   static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
-  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
-  private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
+  // Default stale data allowance set to 3 minutes, 2 minutes more than time
+  // it was collected. Also 2 minutes is the default aggregation interval at
+  // cluster and host levels.
+  static final long DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE = 300000;
   /**
    * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
    */
   private static final int METRICS_PER_MINUTE = 4;
-  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
-    METRICS_PER_MINUTE;
+  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) * METRICS_PER_MINUTE;
+
+  private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
   private static ObjectMapper mapper = new ObjectMapper();
-  private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
-    new TypeReference<Map<Long, Double>>() {};
+  private static TypeReference<Map<Long, Double>> metricValuesTypeRef = new TypeReference<Map<Long, Double>>() {};
+
   private final Configuration hbaseConf;
   private final Configuration metricsConf;
   private final RetryCounterFactory retryCounterFactory;
   private final ConnectionProvider dataSource;
+  private final long outOfBandTimeAllowance;
 
   public PhoenixHBaseAccessor(Configuration hbaseConf,
                               Configuration metricsConf){
@@ -126,6 +134,8 @@ public class PhoenixHBaseAccessor {
     this.retryCounterFactory = new RetryCounterFactory(
       metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
       (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+    this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
+      DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE);
   }
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
@@ -330,6 +340,14 @@ public class PhoenixHBaseAccessor {
         UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
 
       for (TimelineMetric metric : timelineMetrics) {
+        if (Math.abs(currentTime - metric.getStartTime()) > outOfBandTimeAllowance) {
+          // If timeseries start time is way in the past : discard
+          LOG.debug("Discarding out of band timeseries, currentTime = "
+            + currentTime + ", startTime = " + metric.getStartTime()
+            + ", hostname = " + metric.getHostName());
+          continue;
+        }
+
         metricRecordStmt.clearParameters();
 
         if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index d1483ea..d4f919e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -159,6 +159,9 @@ public class TimelineMetricConfiguration {
   public static final String SERVER_SIDE_TIMESIFT_ADJUSTMENT =
     "timeline.metrics.service.cluster.aggregator.timeshift.adjustment";
 
+  public static final String OUT_OFF_BAND_DATA_TIME_ALLOWANCE =
+    "timeline.metrics.service.outofband.time.allowance.millis";
+
   public static final String HOST_APP_ID = "HOST";
 
   private Configuration hbaseConf;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 71f53ca..4ca1b4b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -320,51 +320,51 @@ public class PhoenixTransactSQL {
     }
     PreparedStatement stmt = null;
     try {
-    stmt = connection.prepareStatement(sb.toString());
-    int pos = 1;
-    if (condition.getMetricNames() != null) {
-      for (; pos <= condition.getMetricNames().size(); pos++) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+      stmt = connection.prepareStatement(sb.toString());
+      int pos = 1;
+      if (condition.getMetricNames() != null) {
+        for (; pos <= condition.getMetricNames().size(); pos++) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+          }
+          stmt.setString(pos, condition.getMetricNames().get(pos - 1));
         }
-        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
       }
-    }
-    if (condition.getHostnames() != null) {
-      for (String hostname: condition.getHostnames()) {
+      if (condition.getHostnames() != null) {
+        for (String hostname : condition.getHostnames()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting pos: " + pos + ", value: " + hostname);
+          }
+          stmt.setString(pos++, hostname);
+        }
+      }
+      if (condition.getAppId() != null) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value: " + hostname);
+          LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
         }
-        stmt.setString(pos++, hostname);
+        stmt.setString(pos++, condition.getAppId());
       }
-    }
-    if (condition.getAppId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      if (condition.getInstanceId() != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+        }
+        stmt.setString(pos++, condition.getInstanceId());
       }
-      stmt.setString(pos++, condition.getAppId());
-    }
-    if (condition.getInstanceId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      if (condition.getStartTime() != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+        }
+        stmt.setLong(pos++, condition.getStartTime());
       }
-      stmt.setString(pos++, condition.getInstanceId());
-    }
-    if (condition.getStartTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      if (condition.getEndTime() != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+        }
+        stmt.setLong(pos, condition.getEndTime());
       }
-      stmt.setLong(pos++, condition.getStartTime());
-    }
-    if (condition.getEndTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      if (condition.getFetchSize() != null) {
+        stmt.setFetchSize(condition.getFetchSize());
       }
-      stmt.setLong(pos, condition.getEndTime());
-    }
-    if (condition.getFetchSize() != null) {
-      stmt.setFetchSize(condition.getFetchSize());
-    }
     } catch (SQLException e) {
       if (stmt != null) {
         stmt.close();

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 643e5cc..442dbf5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
@@ -127,6 +128,8 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
   protected PhoenixHBaseAccessor createTestableHBaseAccessor() {
     Configuration metricsConf = new Configuration();
     metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+    // Unit tests insert values into the future
+    metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
 
     return
         new PhoenixHBaseAccessor(

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
index 133c8b5..f265c4f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.internal;
 
 import org.apache.ambari.server.controller.metrics.MetricReportingAdapter;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import java.lang.reflect.InvocationTargetException;
@@ -357,11 +358,10 @@ public abstract class AbstractPropertyProvider extends BaseProvider implements P
   }
 
   // Normalize percent values: Copied over from Ganglia Metric
-  private static Number[][] getGangliaLikeDatapoints(TimelineMetric metric) {
+  private static Number[][] getGangliaLikeDatapoints(TimelineMetric metric, TemporalInfo temporalInfo) {
     MetricReportingAdapter rpt = new MetricReportingAdapter(metric);
 
-    //TODO Don't we always need to downsample?
-    return rpt.reportMetricData(metric);
+    return rpt.reportMetricData(metric, temporalInfo);
   }
 
   /**
@@ -372,11 +372,11 @@ public abstract class AbstractPropertyProvider extends BaseProvider implements P
    *
    * @return a range of temporal data or a point in time value if not temporal
    */
-  protected static Object getValue(TimelineMetric metric, boolean isTemporal) {
-    Number[][] dataPoints = getGangliaLikeDatapoints(metric);
+  protected static Object getValue(TimelineMetric metric, TemporalInfo temporalInfo) {
+    Number[][] dataPoints = getGangliaLikeDatapoints(metric, temporalInfo);
 
     int length = dataPoints.length;
-    if (isTemporal) {
+    if (temporalInfo != null) {
       return length > 0 ? dataPoints : null;
     } else {
       // return the value of the last data point

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
index bce228f..2ffe984 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
@@ -24,14 +24,28 @@ import org.apache.ambari.server.controller.spi.TemporalInfo;
 * Temporal query data.
 */
 public class TemporalInfoImpl implements TemporalInfo {
-  private long m_startTime;
-  private long m_endTime;
-  private long m_step;
+  private final long m_startTime;
+  private final long m_endTime;
+  private final long m_step;
+  private final long startTimeMillis;
+  private final long endTimeMillis;
 
   public TemporalInfoImpl(long startTime, long endTime, long step) {
     m_startTime = startTime;
     m_endTime = endTime;
     m_step = step;
+
+    if (startTime < 9999999999l) {
+      startTimeMillis = startTime * 1000;
+    } else {
+      startTimeMillis = startTime;
+    }
+
+    if (endTime < 9999999999l) {
+      endTimeMillis = endTime * 1000;
+    } else {
+      endTimeMillis = endTime;
+    }
   }
 
   @Override
@@ -50,6 +64,16 @@ public class TemporalInfoImpl implements TemporalInfo {
   }
 
   @Override
+  public Long getStartTimeMillis() {
+    return startTimeMillis;
+  }
+
+  @Override
+  public Long getEndTimeMillis() {
+    return endTimeMillis;
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricReportingAdapter.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricReportingAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricReportingAdapter.java
index d015097..ad711f1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricReportingAdapter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricReportingAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.controller.metrics;
 
+import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 public class MetricReportingAdapter {
@@ -28,7 +29,7 @@ public class MetricReportingAdapter {
     dataTransferMethod = MetricsDataTransferMethodFactory.detectDataTransferMethod(metricDecl);
   }
 
-  public Number[][] reportMetricData(TimelineMetric metricData) {
-    return downsamplingMethod.reportMetricData(metricData, dataTransferMethod);
+  public Number[][] reportMetricData(TimelineMetric metricData, TemporalInfo temporalInfo) {
+    return downsamplingMethod.reportMetricData(metricData, dataTransferMethod, temporalInfo);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethod.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethod.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethod.java
index 8e96a42..8589a1b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethod.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethod.java
@@ -17,8 +17,34 @@
  */
 package org.apache.ambari.server.controller.metrics;
 
+import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class MetricsDownsamplingMethod {
-  public abstract Number[][] reportMetricData(TimelineMetric metricData, MetricsDataTransferMethod dataTransferMethod);
+  // Allow for 2 minute discrepancy to account for client side buffering,
+  // this ensures at least some data is returned in the initial few minutes.
+  private static final long OUT_OF_BAND_TIME_ALLOWANCE = 120000;
+  static Logger LOG = LoggerFactory.getLogger(MetricsDownsamplingMethod.class);
+
+  // Downsampling methods iterate over the entire metrics result to create output array.
+  // Passing down @TemporalInfo avoids re-iterating to filter out out of band data.
+  public abstract Number[][] reportMetricData(TimelineMetric metricData,
+                                              MetricsDataTransferMethod dataTransferMethod,
+                                              TemporalInfo temporalInfo);
+
+  protected boolean isWithinTemporalQueryRange(Long timestamp, TemporalInfo temporalInfo) {
+    boolean retVal = temporalInfo == null ||
+      timestamp >= (temporalInfo.getStartTimeMillis() - OUT_OF_BAND_TIME_ALLOWANCE)
+        && timestamp <= temporalInfo.getEndTimeMillis();
+
+    if (!retVal && LOG.isTraceEnabled()) {
+      LOG.trace("Ignoring out of band metric with ts: " + timestamp + ", "
+        + "temporalInfo: startTime = " + temporalInfo.getStartTimeMillis() + ","
+        + " endTime = " + temporalInfo.getEndTimeMillis());
+    }
+
+    return retVal;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethodFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethodFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethodFactory.java
index f7d3457..19df14c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethodFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsDownsamplingMethodFactory.java
@@ -17,7 +17,11 @@
  */
 package org.apache.ambari.server.controller.metrics;
 
+import com.google.common.collect.Iterators;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -46,15 +50,20 @@ public class MetricsDownsamplingMethodFactory {
 }
 
 class MetricNoDownsampling extends MetricsDownsamplingMethod {
+
   @Override
-  public Number[][] reportMetricData(TimelineMetric metricData, MetricsDataTransferMethod dataTransferMethod) {
+  public Number[][] reportMetricData(TimelineMetric metricData,
+                                     MetricsDataTransferMethod dataTransferMethod,
+                                     TemporalInfo temporalInfo) {
     Number[][] datapointsArray = new Number[metricData.getMetricValues().size()][2];
     int cnt = 0;
 
     for (Map.Entry<Long, Double> metricEntry : metricData.getMetricValues().entrySet()) {
-      datapointsArray[cnt][0] = dataTransferMethod.getData(metricEntry.getValue());
-      datapointsArray[cnt][1] = metricEntry.getKey();
-      cnt++;
+      if (isWithinTemporalQueryRange(metricEntry.getKey(), temporalInfo)) {
+        datapointsArray[cnt][0] = dataTransferMethod.getData(metricEntry.getValue());
+        datapointsArray[cnt][1] = metricEntry.getKey();
+        cnt++;
+      }
     }
 
     return datapointsArray;
@@ -62,6 +71,7 @@ class MetricNoDownsampling extends MetricsDownsamplingMethod {
 }
 
 class MetricsAveragePerSecondDownsampling extends MetricsDownsamplingMethod {
+
   class Accumulo {
     public long ts;
     public Double val;
@@ -71,9 +81,35 @@ class MetricsAveragePerSecondDownsampling extends MetricsDownsamplingMethod {
       this.val = v;
     }
   }
+
+  // Cache does not accept out of band data
+  class OutOfBandAccumuloFilterList<T> extends ArrayList<Accumulo> {
+    TemporalInfo temporalInfo;
+
+    OutOfBandAccumuloFilterList(TemporalInfo temporalInfo) {
+      this.temporalInfo = temporalInfo;
+    }
+
+    @Override
+    public boolean add(Accumulo accumulo) {
+      long ts = accumulo.ts;
+      if (ts < 9999999999l) {
+        ts = ts * 1000;
+      }
+      // Skip out of band data
+      if (isWithinTemporalQueryRange(ts, temporalInfo)) {
+        return super.add(accumulo);
+      }
+      return false;
+    }
+  }
+
   @Override
-  public Number[][] reportMetricData(TimelineMetric metricData, MetricsDataTransferMethod dataTransferMethod) {
-    ArrayList<Accumulo> cache = new ArrayList<Accumulo>();
+  public Number[][] reportMetricData(TimelineMetric metricData,
+                                     MetricsDataTransferMethod dataTransferMethod,
+                                     TemporalInfo temporalInfo) {
+
+    OutOfBandAccumuloFilterList<Accumulo> cache = new OutOfBandAccumuloFilterList<Accumulo>(temporalInfo);
 
     final Iterator<Map.Entry<Long, Double>> ci = metricData.getMetricValues().entrySet().iterator();
 
@@ -96,6 +132,7 @@ class MetricsAveragePerSecondDownsampling extends MetricsDownsamplingMethod {
 
       while(ci.hasNext()) {
         e0 = ci.next();
+
         // Skip null padding at the end of the series.
         if (e0.getValue() == null) {
           if (!lastNonNullEntryAdded) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index b87118b..6667134 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -39,7 +39,6 @@ import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -54,7 +53,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import static org.apache.ambari.server.Role.HBASE_MASTER;
 import static org.apache.ambari.server.Role.HBASE_REGIONSERVER;
 import static org.apache.ambari.server.Role.METRICS_COLLECTOR;
@@ -259,7 +257,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
               for (TimelineMetric metric : metricsMap.get(hostname)) {
                 // Pad zeros or nulls if needed
                 metricsPaddingMethod.applyPaddingStrategy(metric, temporalInfo);
-                populateResource(resource, metric);
+                populateResource(resource, metric, temporalInfo);
               }
             }
           }
@@ -389,7 +387,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
       return result;
     }
 
-    private void populateResource(Resource resource, TimelineMetric metric) {
+    private void populateResource(Resource resource, TimelineMetric metric,
+                                  TemporalInfo temporalInfo) {
       String metric_name = metric.getMetricName();
       Set<String> propertyIdSet = metrics.get(metric_name);
       List<String> parameterList  = new LinkedList<String>();
@@ -423,7 +422,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
                     ++i;
                   }
                 }
-                Object value = getValue(metric, temporalInfo != null);
+                Object value = getValue(metric, temporalInfo);
                 if (value != null) {
                   resource.setProperty(propertyId, value);
                 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
index 2dbff68..a095206 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -207,7 +207,7 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
 
             String propertyId = propertyIdMap.get(metric.getMetricName());
             if (propertyId != null) {
-              resource.setProperty(propertyId, getValue(metric, true));
+              resource.setProperty(propertyId, getValue(metric, temporalInfo));
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/main/java/org/apache/ambari/server/controller/spi/TemporalInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/spi/TemporalInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/spi/TemporalInfo.java
index dca61be..fd4fe52 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/spi/TemporalInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/spi/TemporalInfo.java
@@ -45,4 +45,18 @@ public interface TemporalInfo {
    * @return the step time in seconds
    */
   Long getStep();
+
+  /**
+   * Get milliseconds time from startTime
+   *
+   * @return time in milliseconds
+   */
+  Long getStartTimeMillis();
+
+  /**
+   * Get milliseconds time from endTime
+   *
+   * @return time in milliseconds
+   */
+  Long getEndTimeMillis();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/ganglia/TestStreamProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/ganglia/TestStreamProvider.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/ganglia/TestStreamProvider.java
index 07fa7c7..770dfb6 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/ganglia/TestStreamProvider.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/ganglia/TestStreamProvider.java
@@ -22,11 +22,15 @@ import org.apache.ambari.server.controller.utilities.StreamProvider;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 public class TestStreamProvider implements StreamProvider {
   // Allow for filename to be set at runtime
   protected String fileName;
   private String lastSpec;
+  protected Set<String> specs = new HashSet<String>();
   private boolean isLastSpecUpdated;
 
   public TestStreamProvider(String fileName) {
@@ -35,10 +39,11 @@ public class TestStreamProvider implements StreamProvider {
 
   @Override
   public InputStream readFrom(String spec) throws IOException {
-    if (!isLastSpecUpdated)
+    if (!isLastSpecUpdated) {
       lastSpec = spec;
-    
+    }
     isLastSpecUpdated = false;
+    specs.add(spec);
     
     return ClassLoader.getSystemResourceAsStream(fileName);
   }
@@ -47,6 +52,10 @@ public class TestStreamProvider implements StreamProvider {
     return lastSpec;
   }
 
+  public Set<String> getAllSpecs() {
+    return specs;
+  }
+
   @Override
   public InputStream readFrom(String spec, String requestMethod, String params) throws IOException {
     lastSpec = spec + "?" + params;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index 859fda4..c8007c8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -38,6 +38,7 @@ import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.StackId;
 import org.apache.http.client.utils.URIBuilder;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -47,10 +48,12 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -100,7 +103,7 @@ public class AMSPropertyProviderTest {
     resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244701L, 1416445244901L, 1L));
+    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244800L, 1416448936474L, 15L));
     Request request = PropertyHelper.getReadRequest(Collections.singleton(PROPERTY_ID1), temporalInfoMap);
     Set<Resource> resources =
       propertyProvider.populateResources(Collections.singleton(resource), request, null);
@@ -112,8 +115,8 @@ public class AMSPropertyProviderTest {
     uriBuilder.addParameter("metricNames", "cpu_user");
     uriBuilder.addParameter("hostname", "h1");
     uriBuilder.addParameter("appId", "HOST");
-    uriBuilder.addParameter("startTime", "1416445244701");
-    uriBuilder.addParameter("endTime", "1416445244901");
+    uriBuilder.addParameter("startTime", "1416445244800");
+    uriBuilder.addParameter("endTime", "1416448936474");
     Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
     Assert.assertNotNull("No value for property " + PROPERTY_ID1, val);
@@ -142,13 +145,11 @@ public class AMSPropertyProviderTest {
     resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
     Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
-    Request request = PropertyHelper.getReadRequest(Collections.singleton
-      (PROPERTY_ID1), temporalInfoMap);
+    Request request = PropertyHelper.getReadRequest(Collections.singleton(PROPERTY_ID1), temporalInfoMap);
     System.out.println(request);
 
     // when
-    Set<Resource> resources =
-      propertyProvider.populateResources(Collections.singleton(resource), request, null);
+    Set<Resource> resources = propertyProvider.populateResources(Collections.singleton(resource), request, null);
 
     // then
     Assert.assertEquals(1, resources.size());
@@ -233,31 +234,38 @@ public class AMSPropertyProviderTest {
     resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244701L, 1416445244901L, 1L));
-    temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445244701L, 1416445244901L, 1L));
+    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244701L, 1416448936564L, 15L));
+    temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445244701L, 1416448936564L, 15L));
     Request request = PropertyHelper.getReadRequest(
-      new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID2); }}, temporalInfoMap);
+      new HashSet<String>() {{
+        add(PROPERTY_ID1);
+        add(PROPERTY_ID2);
+        add("params/padding/NONE"); // Ignore padding to match result size
+      }}, temporalInfoMap);
     Set<Resource> resources =
       propertyProvider.populateResources(Collections.singleton(resource), request, null);
     Assert.assertEquals(1, resources.size());
     Resource res = resources.iterator().next();
     Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
     Assert.assertNotNull(properties);
-    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
-    uriBuilder.addParameter("metricNames", "cpu_user,mem_free");
-    uriBuilder.addParameter("hostname", "h1");
-    uriBuilder.addParameter("appId", "HOST");
-    uriBuilder.addParameter("startTime", "1416445244701");
-    uriBuilder.addParameter("endTime", "1416445244901");
+    URIBuilder uriBuilder1 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
+    uriBuilder1.addParameter("metricNames", "cpu_user,mem_free");
+    uriBuilder1.addParameter("hostname", "h1");
+    uriBuilder1.addParameter("appId", "HOST");
+    uriBuilder1.addParameter("startTime", "1416445244701");
+    uriBuilder1.addParameter("endTime", "1416448936564");
 
     URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder2.addParameter("metricNames", "mem_free,cpu_user");
     uriBuilder2.addParameter("hostname", "h1");
     uriBuilder2.addParameter("appId", "HOST");
     uriBuilder2.addParameter("startTime", "1416445244701");
-    uriBuilder2.addParameter("endTime", "1416445244901");
-    Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
-      || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
+    uriBuilder2.addParameter("endTime", "1416448936564");
+
+    List<String> allSpecs = new ArrayList<String>(streamProvider.getAllSpecs());
+    Assert.assertEquals(1, allSpecs.size());
+    Assert.assertTrue(uriBuilder1.toString().equals(allSpecs.get(0))
+      || uriBuilder2.toString().equals(allSpecs.get(0)));
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
     Assert.assertEquals(111, val.length);
     val = (Number[][]) res.getPropertyValue(PROPERTY_ID2);
@@ -295,7 +303,7 @@ public class AMSPropertyProviderTest {
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");// should be set?
     resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "RESOURCEMANAGER");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(propertyId1, new TemporalInfoImpl(1416528819369L, 1416528819569L, 1L));
+    temporalInfoMap.put(propertyId1, new TemporalInfoImpl(1416528759233L, 1416531129231L, 1L));
     Request request = PropertyHelper.getReadRequest(
         Collections.singleton(propertyId1), temporalInfoMap);
     Set<Resource> resources =
@@ -307,8 +315,8 @@ public class AMSPropertyProviderTest {
     URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder.addParameter("metricNames", "yarn.QueueMetrics.Queue=root.AvailableMB");
     uriBuilder.addParameter("appId", "RESOURCEMANAGER");
-    uriBuilder.addParameter("startTime", "1416528819369");
-    uriBuilder.addParameter("endTime", "1416528819569");
+    uriBuilder.addParameter("startTime", "1416528759233");
+    uriBuilder.addParameter("endTime", "1416531129231");
     Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
     Number[][] val = (Number[][]) res.getPropertyValue("metrics/yarn/Queue/root/AvailableMB");
     Assert.assertNotNull("No value for property metrics/yarn/Queue/root/AvailableMB", val);
@@ -340,7 +348,7 @@ public class AMSPropertyProviderTest {
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
     resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "NAMENODE");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1416528819369L, 1416528819569L, 1L));
+    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1416528759233L, 1416531129231L, 1L));
     Request request = PropertyHelper.getReadRequest(
       Collections.singleton(propertyId), temporalInfoMap);
     Set<Resource> resources =
@@ -352,8 +360,8 @@ public class AMSPropertyProviderTest {
     URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder.addParameter("metricNames", "rpc.rpc.RpcQueueTimeAvgTime");
     uriBuilder.addParameter("appId", "NAMENODE");
-    uriBuilder.addParameter("startTime", "1416528819369");
-    uriBuilder.addParameter("endTime", "1416528819569");
+    uriBuilder.addParameter("startTime", "1416528759233");
+    uriBuilder.addParameter("endTime", "1416531129231");
     Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
     Number[][] val = (Number[][]) res.getPropertyValue(propertyId);
     Assert.assertNotNull("No value for property " + propertyId, val);
@@ -492,10 +500,53 @@ public class AMSPropertyProviderTest {
     Assert.assertEquals(32, val.length);
   }
 
+  @Test
+  public void testFilterOutOfBandMetricData() throws Exception {
+    setUpCommonMocks();
+    TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
+    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
+
+    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
+    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
+      propertyIds,
+      streamProvider,
+      sslConfiguration,
+      metricHostProvider,
+      CLUSTER_NAME_PROPERTY_ID,
+      HOST_NAME_PROPERTY_ID
+    );
+
+    Resource resource = new ResourceImpl(Resource.Type.Host);
+    resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    // Chopped a section in the middle
+    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416446744801L, 1416447224801L, 1L));
+    Request request = PropertyHelper.getReadRequest(Collections.singleton(PROPERTY_ID1), temporalInfoMap);
+    Set<Resource> resources =
+      propertyProvider.populateResources(Collections.singleton(resource), request, null);
+    Assert.assertEquals(1, resources.size());
+    Resource res = resources.iterator().next();
+    Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
+    Assert.assertNotNull(properties);
+    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
+    uriBuilder.addParameter("metricNames", "cpu_user");
+    uriBuilder.addParameter("hostname", "h1");
+    uriBuilder.addParameter("appId", "HOST");
+    uriBuilder.addParameter("startTime", "1416446744801");
+    uriBuilder.addParameter("endTime", "1416447224801");
+    Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
+    Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
+    Assert.assertNotNull("No value for property " + PROPERTY_ID1, val);
+    // 4 entries fit into the default allowance limit
+    Assert.assertEquals(25, val.length);
+  }
+
   static class TestStreamProviderForHostComponentHostMetricsTest extends TestStreamProvider {
     String hostMetricFilePath = FILE_PATH_PREFIX + "single_host_metric.json";
     String hostComponentMetricFilePath = FILE_PATH_PREFIX + "single_host_component_metrics.json";
-    Set<String> specs = new HashSet<String>();
+
 
     public TestStreamProviderForHostComponentHostMetricsTest(String fileName) {
       super(fileName);
@@ -513,10 +564,6 @@ public class AMSPropertyProviderTest {
 
       return super.readFrom(spec);
     }
-
-    public Set<String> getAllSpecs() {
-      return specs;
-    }
   }
 
   @Test
@@ -543,10 +590,16 @@ public class AMSPropertyProviderTest {
     resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
     resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "DATANODE");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244701L, 1416445251802L, 1L));
-    temporalInfoMap.put(PROPERTY_ID3, new TemporalInfoImpl(1416445244701L, 1416445251802L, 1L));
+    // Set same time ranges to make sure the query comes in as grouped and
+    // then turns into a separate query to the backend
+    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L));
+    temporalInfoMap.put(PROPERTY_ID3, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L));
     Request request = PropertyHelper.getReadRequest(
-      new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID3); }},
+      new HashSet<String>() {{
+        add(PROPERTY_ID1);
+        add(PROPERTY_ID3);
+        add("params/padding/NONE"); // Ignore padding to match result size
+      }},
       temporalInfoMap);
     Set<Resource> resources =
       propertyProvider.populateResources(Collections.singleton(resource), request, null);
@@ -573,16 +626,16 @@ public class AMSPropertyProviderTest {
     uriBuilder1.addParameter("metricNames", "dfs.datanode.BlocksReplicated");
     uriBuilder1.addParameter("hostname", "h1");
     uriBuilder1.addParameter("appId", "DATANODE");
-    uriBuilder1.addParameter("startTime", "1416445244701");
-    uriBuilder1.addParameter("endTime", "1416445251802");
+    uriBuilder1.addParameter("startTime", "1416445244801");
+    uriBuilder1.addParameter("endTime", "1416448936464");
     Assert.assertEquals(uriBuilder1.toString(), hostComponentMetricsSpec);
 
     URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder2.addParameter("metricNames", "cpu_user");
     uriBuilder2.addParameter("hostname", "h1");
     uriBuilder2.addParameter("appId", "HOST");
-    uriBuilder2.addParameter("startTime", "1416445244701");
-    uriBuilder2.addParameter("endTime", "1416445251802");
+    uriBuilder2.addParameter("startTime", "1416445244801");
+    uriBuilder2.addParameter("endTime", "1416448936464");
     Assert.assertEquals(uriBuilder2.toString(), hostMetricSpec);
 
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
index c0ce419..3ee64fa 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
@@ -66,7 +66,7 @@ public class AMSReportPropertyProviderTest {
     Resource resource = new ResourceImpl(Resource.Type.Cluster);
     resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1416445244701L, 1416445244901L, 1L));
+    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1416445244800L, 1416448936474L, 1L));
     Request request = PropertyHelper.getReadRequest(
       Collections.singleton(propertyId), temporalInfoMap);
     Set<Resource> resources =
@@ -78,8 +78,8 @@ public class AMSReportPropertyProviderTest {
     URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder.addParameter("metricNames", "cpu_user");
     uriBuilder.addParameter("appId", "HOST");
-    uriBuilder.addParameter("startTime", "1416445244701");
-    uriBuilder.addParameter("endTime", "1416445244901");
+    uriBuilder.addParameter("startTime", "1416445244800");
+    uriBuilder.addParameter("endTime", "1416448936474");
     Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
     Number[][] val = (Number[][]) res.getPropertyValue("metrics/cpu/User");
     Assert.assertEquals(111, val.length);
@@ -107,7 +107,7 @@ public class AMSReportPropertyProviderTest {
     Resource resource = new ResourceImpl(Resource.Type.Cluster);
     resource.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
     Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
-    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1432033256912L, 1432033257912L, 1L));
+    temporalInfoMap.put(propertyId, new TemporalInfoImpl(1432033257812L, 1432035927922L, 1L));
     Request request = PropertyHelper.getReadRequest(
       Collections.singleton(propertyId), temporalInfoMap);
     Set<Resource> resources =
@@ -119,10 +119,10 @@ public class AMSReportPropertyProviderTest {
     URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
     uriBuilder.addParameter("metricNames", "cpu_user._sum");
     uriBuilder.addParameter("appId", "HOST");
-    uriBuilder.addParameter("startTime", "1432033256912");
-    uriBuilder.addParameter("endTime", "1432033257912");
+    uriBuilder.addParameter("startTime", "1432033257812");
+    uriBuilder.addParameter("endTime", "1432035927922");
     Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
     Number[][] val = (Number[][]) res.getPropertyValue("metrics/cpu/User._sum");
-    Assert.assertEquals(91, val.length);
+    Assert.assertEquals(90, val.length);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
index 7665e5e..c30c5eb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
@@ -203,6 +203,16 @@ public class MetricsPaddingMethodTest {
       public Long getStep() {
         return step;
       }
+
+      @Override
+      public Long getStartTimeMillis() {
+        return startTime;
+      }
+
+      @Override
+      public Long getEndTimeMillis() {
+        return endTime;
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7763a47e/ambari-server/src/test/resources/ams/single_host_component_metrics.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/ams/single_host_component_metrics.json b/ambari-server/src/test/resources/ams/single_host_component_metrics.json
index b8c4e24..1bca0f2 100644
--- a/ambari-server/src/test/resources/ams/single_host_component_metrics.json
+++ b/ambari-server/src/test/resources/ams/single_host_component_metrics.json
@@ -9,13 +9,13 @@
     "starttime": 1416445244801,
     "metrics": {
       "1416445244801": 0.0,
-      "1416445245801": 0.0,
-      "1416445246801": 0.0,
-      "1416445247801": 0.0,
-      "1416445248801": 0.0,
-      "1416445249801": 0.0,
-      "1416445250801": 0.0,
-      "1416445251801": 0.0
+      "1416445259801": 0.0,
+      "1416445274801": 0.0,
+      "1416445289801": 0.0,
+      "1416445304801": 0.0,
+      "1416445319801": 0.0,
+      "1416445364801": 0.0,
+      "1416445379801": 0.0
     }
   }
 ]}


Mime
View raw message