ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: Revert "AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)"
Date Tue, 31 May 2016 06:11:34 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk f8e83d365 -> f210586ad


Revert "AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library
(dsen)"

This reverts commit 46b2fcdef0b0fa9d741951cbb6f9c97b53d09be4.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f210586a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f210586a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f210586a

Branch: refs/heads/trunk
Commit: f210586ad57ff60d467c3e84e7f4783d34435660
Parents: f8e83d3
Author: Dmytro Sen <dsen@apache.org>
Authored: Tue May 31 09:09:01 2016 +0300
Committer: Dmytro Sen <dsen@apache.org>
Committed: Tue May 31 09:09:01 2016 +0300

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  28 +---
 .../timeline/cache/TimelineMetricsCache.java    | 161 +++++++++++++------
 .../cache/TimelineMetricsCacheTest.java         |  20 +--
 .../timeline/HadoopTimelineMetricsSinkTest.java |  11 +-
 4 files changed, 122 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f210586a/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index feaee22..41ba62e 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -70,32 +70,6 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.3</version>
-        <executions>
-          <!-- Run shade goal on package phase -->
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <artifactSet>
-                <includes>
-                  <include>commons-io:*</include>
-                  <include>com.google.code.gson:*</include>
-                  <include>com.google.guava:*</include>
-                  <include>org.apache.curator:*</include>
-                </includes>
-              </artifactSet>
-              <minimizeJar>true</minimizeJar>
-              <createDependencyReducedPom>false</createDependencyReducedPom>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 
@@ -105,6 +79,7 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
+    <!-- TODO: Need to add these as shaded dependencies -->
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -125,6 +100,7 @@
       <artifactId>curator-framework</artifactId>
       <version>2.7.1</version>
     </dependency>
+    <!--  END TODO -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-xc</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f210586a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 57f1437..0bed7d0 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline.cache;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,18 +25,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class TimelineMetricsCache {
 
-  private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
+  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
   private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
   public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
   public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
@@ -56,81 +54,144 @@ public class TimelineMetricsCache {
     this.maxRecsPerName = maxRecsPerName;
     this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
     this.skipCounterTransform = skipCounterTransform;
-    this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis
* 2, TimeUnit.MILLISECONDS).build();
   }
 
   class TimelineMetricWrapper {
-    private Cache<Long, Double> dataPointsCache;
+    private long timeDiff = -1;
+    private long oldestTimestamp = -1;
     private TimelineMetric timelineMetric;
-    private Long oldestTimeStamp;
-    private Long newestTimeStamp;
 
     TimelineMetricWrapper(TimelineMetric timelineMetric) {
       this.timelineMetric = timelineMetric;
-      dataPointsCache = CacheBuilder.newBuilder().
-              maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
+      this.oldestTimestamp = timelineMetric.getStartTime();
+    }
 
-      putMetric(timelineMetric);
+    private void updateTimeDiff(long timestamp) {
+      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+        timeDiff = timestamp - oldestTimestamp;
+      } else {
+        oldestTimestamp = timestamp;
+      }
     }
 
     public synchronized void putMetric(TimelineMetric metric) {
-      if (dataPointsCache.size() == 0) {
-        oldestTimeStamp = metric.getStartTime();
-        newestTimeStamp = metric.getStartTime();
+      TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
+      if (metricValues.size() > maxRecsPerName) {
+        // remove values for eldest maxEvictionTimeInMillis
+        long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
+        TreeMap<Long, Double> metricsSubSet =
+          new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
+        if (metricsSubSet.isEmpty()) {
+          oldestTimestamp = metric.getStartTime();
+          this.timelineMetric.setStartTime(metric.getStartTime());
+        } else {
+          Long newStartTime = metricsSubSet.firstKey();
+          oldestTimestamp = newStartTime;
+          this.timelineMetric.setStartTime(newStartTime);
+        }
+        this.timelineMetric.setMetricValues(metricsSubSet);
+        LOG.warn("Metrics cache overflow. Values for metric " +
+          metric.getMetricName() + " older than " + newEldestTimestamp +
+          " were removed to clean up the cache.");
       }
-      TreeMap<Long, Double> metricValues = metric.getMetricValues();
-      for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
-        Long key = entry.getKey();
-        dataPointsCache.put(key, entry.getValue());
-      }
-      oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
-      newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
+      this.timelineMetric.addMetricValues(metric.getMetricValues());
+      updateTimeDiff(metric.getStartTime());
+    }
+
+    public synchronized long getTimeDiff() {
+      return timeDiff;
     }
 
     public synchronized TimelineMetric getTimelineMetric() {
-      TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
-      if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis)
{
+      return timelineMetric;
+    }
+  }
+
+  // TODO: Add weighted eviction
+  class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper>
{
+    private static final long serialVersionUID = 2L;
+    // To avoid duplication at the end of the buffer and beginning of the next
+    // segment of values
+    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
+
+    public TimelineMetric evict(String metricName) {
+      TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+      if (metricWrapper == null
+        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
         return null;
       }
-      dataPointsCache.invalidateAll();
-      timelineMetric.setStartTime(metricValues.firstKey());
-      timelineMetric.setMetricValues(metricValues);
-      return new TimelineMetric(timelineMetric);
+
+      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+      this.remove(metricName);
+
+      return timelineMetric;
+    }
+
+    public TimelineMetrics evictAll() {
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator();
it.hasNext();) {
+        Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
+        TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
+        if (metricWrapper != null) {
+          TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
+          metricList.add(timelineMetric);
+        }
+        it.remove();
+      }
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+      return timelineMetrics;
+    }
+
+    public void put(String metricName, TimelineMetric timelineMetric) {
+      if (isDuplicate(timelineMetric)) {
+        return;
+      }
+      TimelineMetricWrapper metric = this.get(metricName);
+      if (metric == null) {
+        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+      } else {
+        metric.putMetric(timelineMetric);
+      }
+      // Buffer last ts value
+      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+    }
+
+    /**
+     * Test whether last buffered timestamp is same as the newly received.
+     * @param timelineMetric @TimelineMetric
+     * @return true/false
+     */
+    private boolean isDuplicate(TimelineMetric timelineMetric) {
+      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
     }
   }
 
   public TimelineMetric getTimelineMetric(String metricName) {
-    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
-    if (timelineMetricWrapper != null) {
-      return timelineMetricWrapper.getTimelineMetric();
+    if (timelineMetricCache.containsKey(metricName)) {
+      return timelineMetricCache.evict(metricName);
     }
+
     return null;
   }
 
   public TimelineMetrics getAllMetrics() {
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-    Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
-    List<TimelineMetric> timelineMetricList =
-            new ArrayList<>(timelineMetricWrapperCollection.size());
-
-    for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
-      timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
-    }
-
-    timelineMetrics.setMetrics(timelineMetricList);
-    return timelineMetrics;
+    return timelineMetricCache.evictAll();
   }
 
+  /**
+   * Getter method to help testing eviction
+   * @return @int
+   */
+  public int getMaxEvictionTimeInMillis() {
+    return maxEvictionTimeInMillis;
+  }
 
   public void putTimelineMetric(TimelineMetric timelineMetric) {
-    String metricName = timelineMetric.getMetricName();
-    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
-
-    if (timelineMetricWrapper != null) {
-      timelineMetricWrapper.putMetric(timelineMetric);
-    } else {
-      timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
-    }
+    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
   }
 
   private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f210586a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
index 87c848b..18d973c 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
@@ -26,7 +26,6 @@ import java.util.TreeMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class TimelineMetricsCacheTest {
 
@@ -76,8 +75,8 @@ public class TimelineMetricsCacheTest {
   }
 
   @Test
-  public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception {
-    int maxRecsPerName = 3;
+  public void testMaxRecsPerName() throws Exception {
+    int maxRecsPerName = 2;
     int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ;
     TimelineMetricsCache timelineMetricsCache =
       new TimelineMetricsCache(maxRecsPerName, maxEvictionTime);
@@ -125,21 +124,6 @@ public class TimelineMetricsCacheTest {
     assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime());
   }
 
-  @Test
-  public void testEvictionTimeForTimelineMetricWrapperCache() {
-    int maxEvictionTime = 10;
-    TimelineMetricsCache timelineMetricsCache =
-            new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime);
-    int numberOfMetricsInserted = 1000;
-    for (int i = 0; i < numberOfMetricsInserted; i++) {
-      timelineMetricsCache.putTimelineMetric(
-              createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i));
-    }
-    TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
-    assertNotNull(cachedMetric);
-    assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size()
< numberOfMetricsInserted);
-  }
-
   private TimelineMetric createTimelineMetricSingleValue(final long startTime) {
     TreeMap<Long, Double> values = new TreeMap<Long, Double>();
     values.put(startTime, 0.0);

http://git-wip-us.apache.org/repos/asf/ambari/blob/f210586a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index aeeee4f..3d4a929 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -180,7 +180,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
     // Third entry will result in eviction
-    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes();
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
 
     expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
       .andReturn(Collections.singletonList("localhost")).anyTimes();
@@ -212,6 +212,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(metric.value()).andReturn(3.0).once();
     expect(metric.value()).andReturn(4.0).once();
     expect(metric.value()).andReturn(5.0).once();
+    expect(metric.value()).andReturn(6.0).once();
 
     MetricsRecord record = createNiceMock(MetricsRecord.class);
     expect(record.name()).andReturn("testName").anyTimes();
@@ -228,7 +229,7 @@ public class HadoopTimelineMetricsSinkTest {
     final Long now = System.currentTimeMillis();
     // TODO: Current implementation of cache needs > 1 elements to evict any
     expect(record.timestamp()).andReturn(now).times(2);
-    expect(record.timestamp()).andReturn(now + 100l).once();
+    expect(record.timestamp()).andReturn(now + 100l).times(2);
     expect(record.timestamp()).andReturn(now + 200l).once();
     expect(record.timestamp()).andReturn(now + 300l).once();
 
@@ -259,6 +260,8 @@ public class HadoopTimelineMetricsSinkTest {
     sink.putMetrics(record);
     // time = t3
     sink.putMetrics(record);
+    // time = t4
+    sink.putMetrics(record);
 
     verify(conf, sink, record, metric);
 
@@ -272,7 +275,7 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(now, timestamps.next());
     Assert.assertEquals(new Long(now + 100l), timestamps.next());
     Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
-    Assert.assertEquals(new Double(2.0), values.next());
+    Assert.assertEquals(new Double(1.0), values.next());
     Assert.assertEquals(new Double(3.0), values.next());
     // t3, t4
     TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
@@ -281,8 +284,8 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(new Long(now + 200l), timestamps.next());
     Assert.assertEquals(new Long(now + 300l), timestamps.next());
     values = timelineMetric2.getMetricValues().values().iterator();
-    Assert.assertEquals(new Double(4.0), values.next());
     Assert.assertEquals(new Double(5.0), values.next());
+    Assert.assertEquals(new Double(6.0), values.next());
   }
 
   @Test


Mime
View raw message