ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)
Date Mon, 30 May 2016 15:39:41 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk baa8ae916 -> 46b2fcdef


AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/46b2fcde
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/46b2fcde
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/46b2fcde

Branch: refs/heads/trunk
Commit: 46b2fcdef0b0fa9d741951cbb6f9c97b53d09be4
Parents: baa8ae9
Author: Dmytro Sen <dsen@apache.org>
Authored: Mon May 30 18:39:28 2016 +0300
Committer: Dmytro Sen <dsen@apache.org>
Committed: Mon May 30 18:39:28 2016 +0300

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  28 +++-
 .../timeline/cache/TimelineMetricsCache.java    | 161 ++++++-------------
 .../cache/TimelineMetricsCacheTest.java         |  20 ++-
 .../timeline/HadoopTimelineMetricsSinkTest.java |  11 +-
 4 files changed, 98 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/46b2fcde/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index 41ba62e..feaee22 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -70,6 +70,32 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <!-- Run shade goal on package phase -->
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <artifactSet>
+                <includes>
+                  <include>commons-io:*</include>
+                  <include>com.google.code.gson:*</include>
+                  <include>com.google.guava:*</include>
+                  <include>org.apache.curator:*</include>
+                </includes>
+              </artifactSet>
+              <minimizeJar>true</minimizeJar>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -79,7 +105,6 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
-    <!-- TODO: Need to add these as shaded dependencies -->
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -100,7 +125,6 @@
       <artifactId>curator-framework</artifactId>
       <version>2.7.1</version>
     </dependency>
-    <!--  END TODO -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-xc</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/46b2fcde/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 0bed7d0..57f1437 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline.cache;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -25,18 +27,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class TimelineMetricsCache {
 
-  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
+  private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
   private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
   public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
   public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
@@ -54,144 +56,81 @@ public class TimelineMetricsCache {
     this.maxRecsPerName = maxRecsPerName;
     this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
     this.skipCounterTransform = skipCounterTransform;
+    this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis
* 2, TimeUnit.MILLISECONDS).build();
   }
 
   class TimelineMetricWrapper {
-    private long timeDiff = -1;
-    private long oldestTimestamp = -1;
+    private Cache<Long, Double> dataPointsCache;
     private TimelineMetric timelineMetric;
+    private Long oldestTimeStamp;
+    private Long newestTimeStamp;
 
     TimelineMetricWrapper(TimelineMetric timelineMetric) {
       this.timelineMetric = timelineMetric;
-      this.oldestTimestamp = timelineMetric.getStartTime();
-    }
+      dataPointsCache = CacheBuilder.newBuilder().
+              maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
 
-    private void updateTimeDiff(long timestamp) {
-      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
-        timeDiff = timestamp - oldestTimestamp;
-      } else {
-        oldestTimestamp = timestamp;
-      }
+      putMetric(timelineMetric);
     }
 
     public synchronized void putMetric(TimelineMetric metric) {
-      TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
-      if (metricValues.size() > maxRecsPerName) {
-        // remove values for eldest maxEvictionTimeInMillis
-        long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
-        TreeMap<Long, Double> metricsSubSet =
-          new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
-        if (metricsSubSet.isEmpty()) {
-          oldestTimestamp = metric.getStartTime();
-          this.timelineMetric.setStartTime(metric.getStartTime());
-        } else {
-          Long newStartTime = metricsSubSet.firstKey();
-          oldestTimestamp = newStartTime;
-          this.timelineMetric.setStartTime(newStartTime);
-        }
-        this.timelineMetric.setMetricValues(metricsSubSet);
-        LOG.warn("Metrics cache overflow. Values for metric " +
-          metric.getMetricName() + " older than " + newEldestTimestamp +
-          " were removed to clean up the cache.");
+      if (dataPointsCache.size() == 0) {
+        oldestTimeStamp = metric.getStartTime();
+        newestTimeStamp = metric.getStartTime();
       }
-      this.timelineMetric.addMetricValues(metric.getMetricValues());
-      updateTimeDiff(metric.getStartTime());
-    }
-
-    public synchronized long getTimeDiff() {
-      return timeDiff;
+      TreeMap<Long, Double> metricValues = metric.getMetricValues();
+      for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
+        Long key = entry.getKey();
+        dataPointsCache.put(key, entry.getValue());
+      }
+      oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
+      newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
     }
 
     public synchronized TimelineMetric getTimelineMetric() {
-      return timelineMetric;
-    }
-  }
-
-  // TODO: Add weighted eviction
-  class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper>
{
-    private static final long serialVersionUID = 2L;
-    // To avoid duplication at the end of the buffer and beginning of the next
-    // segment of values
-    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
-
-    public TimelineMetric evict(String metricName) {
-      TimelineMetricWrapper metricWrapper = this.get(metricName);
-
-      if (metricWrapper == null
-        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
+      TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
+      if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis)
{
         return null;
       }
-
-      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
-      this.remove(metricName);
-
-      return timelineMetric;
-    }
-
-    public TimelineMetrics evictAll() {
-      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-
-      for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator();
it.hasNext();) {
-        Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
-        TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
-        if (metricWrapper != null) {
-          TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
-          metricList.add(timelineMetric);
-        }
-        it.remove();
-      }
-      TimelineMetrics timelineMetrics = new TimelineMetrics();
-      timelineMetrics.setMetrics(metricList);
-      return timelineMetrics;
-    }
-
-    public void put(String metricName, TimelineMetric timelineMetric) {
-      if (isDuplicate(timelineMetric)) {
-        return;
-      }
-      TimelineMetricWrapper metric = this.get(metricName);
-      if (metric == null) {
-        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
-      } else {
-        metric.putMetric(timelineMetric);
-      }
-      // Buffer last ts value
-      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
-    }
-
-    /**
-     * Test whether last buffered timestamp is same as the newly received.
-     * @param timelineMetric @TimelineMetric
-     * @return true/false
-     */
-    private boolean isDuplicate(TimelineMetric timelineMetric) {
-      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
-        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
+      dataPointsCache.invalidateAll();
+      timelineMetric.setStartTime(metricValues.firstKey());
+      timelineMetric.setMetricValues(metricValues);
+      return new TimelineMetric(timelineMetric);
     }
   }
 
   public TimelineMetric getTimelineMetric(String metricName) {
-    if (timelineMetricCache.containsKey(metricName)) {
-      return timelineMetricCache.evict(metricName);
+    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
+    if (timelineMetricWrapper != null) {
+      return timelineMetricWrapper.getTimelineMetric();
     }
-
     return null;
   }
 
   public TimelineMetrics getAllMetrics() {
-    return timelineMetricCache.evictAll();
-  }
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
+    List<TimelineMetric> timelineMetricList =
+            new ArrayList<>(timelineMetricWrapperCollection.size());
+
+    for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
+      timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
+    }
 
-  /**
-   * Getter method to help testing eviction
-   * @return @int
-   */
-  public int getMaxEvictionTimeInMillis() {
-    return maxEvictionTimeInMillis;
+    timelineMetrics.setMetrics(timelineMetricList);
+    return timelineMetrics;
   }
 
+
   public void putTimelineMetric(TimelineMetric timelineMetric) {
-    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
+    String metricName = timelineMetric.getMetricName();
+    TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
+
+    if (timelineMetricWrapper != null) {
+      timelineMetricWrapper.putMetric(timelineMetric);
+    } else {
+      timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
+    }
   }
 
   private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/46b2fcde/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
index 18d973c..87c848b 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class TimelineMetricsCacheTest {
 
@@ -75,8 +76,8 @@ public class TimelineMetricsCacheTest {
   }
 
   @Test
-  public void testMaxRecsPerName() throws Exception {
-    int maxRecsPerName = 2;
+  public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception {
+    int maxRecsPerName = 3;
     int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ;
     TimelineMetricsCache timelineMetricsCache =
       new TimelineMetricsCache(maxRecsPerName, maxEvictionTime);
@@ -124,6 +125,21 @@ public class TimelineMetricsCacheTest {
     assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime());
   }
 
+  @Test
+  public void testEvictionTimeForTimelineMetricWrapperCache() {
+    int maxEvictionTime = 10;
+    TimelineMetricsCache timelineMetricsCache =
+            new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime);
+    int numberOfMetricsInserted = 1000;
+    for (int i = 0; i < numberOfMetricsInserted; i++) {
+      timelineMetricsCache.putTimelineMetric(
+              createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i));
+    }
+    TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
+    assertNotNull(cachedMetric);
+    assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size()
< numberOfMetricsInserted);
+  }
+
   private TimelineMetric createTimelineMetricSingleValue(final long startTime) {
     TreeMap<Long, Double> values = new TreeMap<Long, Double>();
     values.put(startTime, 0.0);

http://git-wip-us.apache.org/repos/asf/ambari/blob/46b2fcde/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 3d4a929..aeeee4f 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -180,7 +180,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
     // Third entry will result in eviction
-    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+    expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes();
 
     expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
       .andReturn(Collections.singletonList("localhost")).anyTimes();
@@ -212,7 +212,6 @@ public class HadoopTimelineMetricsSinkTest {
     expect(metric.value()).andReturn(3.0).once();
     expect(metric.value()).andReturn(4.0).once();
     expect(metric.value()).andReturn(5.0).once();
-    expect(metric.value()).andReturn(6.0).once();
 
     MetricsRecord record = createNiceMock(MetricsRecord.class);
     expect(record.name()).andReturn("testName").anyTimes();
@@ -229,7 +228,7 @@ public class HadoopTimelineMetricsSinkTest {
     final Long now = System.currentTimeMillis();
     // TODO: Current implementation of cache needs > 1 elements to evict any
     expect(record.timestamp()).andReturn(now).times(2);
-    expect(record.timestamp()).andReturn(now + 100l).times(2);
+    expect(record.timestamp()).andReturn(now + 100l).once();
     expect(record.timestamp()).andReturn(now + 200l).once();
     expect(record.timestamp()).andReturn(now + 300l).once();
 
@@ -260,8 +259,6 @@ public class HadoopTimelineMetricsSinkTest {
     sink.putMetrics(record);
     // time = t3
     sink.putMetrics(record);
-    // time = t4
-    sink.putMetrics(record);
 
     verify(conf, sink, record, metric);
 
@@ -275,7 +272,7 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(now, timestamps.next());
     Assert.assertEquals(new Long(now + 100l), timestamps.next());
     Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
-    Assert.assertEquals(new Double(1.0), values.next());
+    Assert.assertEquals(new Double(2.0), values.next());
     Assert.assertEquals(new Double(3.0), values.next());
     // t3, t4
     TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
@@ -284,8 +281,8 @@ public class HadoopTimelineMetricsSinkTest {
     Assert.assertEquals(new Long(now + 200l), timestamps.next());
     Assert.assertEquals(new Long(now + 300l), timestamps.next());
     values = timelineMetric2.getMetricValues().values().iterator();
+    Assert.assertEquals(new Double(4.0), values.next());
     Assert.assertEquals(new Double(5.0), values.next());
-    Assert.assertEquals(new Double(6.0), values.next());
   }
 
   @Test


Mime
View raw message