Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02E922009C6 for ; Tue, 31 May 2016 22:05:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 01BD7160A44; Tue, 31 May 2016 20:05:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CA0811609AD for ; Tue, 31 May 2016 22:05:53 +0200 (CEST) Received: (qmail 12775 invoked by uid 500); 31 May 2016 20:05:53 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 12759 invoked by uid 99); 31 May 2016 20:05:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 May 2016 20:05:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95ED1DFB74; Tue, 31 May 2016 20:05:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dsen@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen) Date: Tue, 31 May 2016 20:05:52 +0000 (UTC) archived-at: Tue, 31 May 2016 20:05:55 -0000 Repository: ambari Updated Branches: refs/heads/branch-2.4 f3df02522 -> ecaef4142 AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ecaef414 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ecaef414 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ecaef414 Branch: refs/heads/branch-2.4 Commit: ecaef41426ff67811ac4b2f5c618fb651286d160 Parents: f3df025 Author: Dmytro Sen Authored: Tue May 31 23:05:46 2016 +0300 Committer: Dmytro Sen Committed: Tue May 31 23:05:46 2016 +0300 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-common/pom.xml | 28 ++++ .../timeline/cache/TimelineMetricsCache.java | 161 ++++++------------- .../cache/TimelineMetricsCacheTest.java | 20 ++- .../timeline/HadoopTimelineMetricsSinkTest.java | 11 +- ambari-project/pom.xml | 5 - ambari-server/pom.xml | 5 + 6 files changed, 105 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index af37d28..3e17f22 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -70,11 +70,39 @@ + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + + package + + shade + + + + + com.google.guava:* + + + true + false + + + + + com.google.guava + guava + 14.0.1 + + commons-logging commons-logging 1.1.1 http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java index 0bed7d0..57f1437 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.metrics2.sink.timeline.cache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -25,18 +27,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; @InterfaceAudience.Public @InterfaceStability.Evolving public class TimelineMetricsCache { - private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder(); + private final Cache timelineMetricCache; private static final Log LOG = LogFactory.getLog(TimelineMetric.class); public static final int MAX_RECS_PER_NAME_DEFAULT = 10000; public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min @@ -54,144 +56,81 @@ public class TimelineMetricsCache { this.maxRecsPerName = maxRecsPerName; this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; this.skipCounterTransform = skipCounterTransform; + this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build(); } class TimelineMetricWrapper { - private long timeDiff = -1; - private long oldestTimestamp = -1; + private Cache dataPointsCache; private TimelineMetric timelineMetric; + private Long oldestTimeStamp; + private Long newestTimeStamp; TimelineMetricWrapper(TimelineMetric timelineMetric) { this.timelineMetric = timelineMetric; - this.oldestTimestamp = timelineMetric.getStartTime(); - } + dataPointsCache = CacheBuilder.newBuilder(). + maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build(); - private void updateTimeDiff(long timestamp) { - if (oldestTimestamp != -1 && timestamp > oldestTimestamp) { - timeDiff = timestamp - oldestTimestamp; - } else { - oldestTimestamp = timestamp; - } + putMetric(timelineMetric); } public synchronized void putMetric(TimelineMetric metric) { - TreeMap metricValues = this.timelineMetric.getMetricValues(); - if (metricValues.size() > maxRecsPerName) { - // remove values for eldest maxEvictionTimeInMillis - long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis; - TreeMap metricsSubSet = - new TreeMap<>(metricValues.tailMap(newEldestTimestamp)); - if (metricsSubSet.isEmpty()) { - oldestTimestamp = metric.getStartTime(); - this.timelineMetric.setStartTime(metric.getStartTime()); - } else { - Long newStartTime = metricsSubSet.firstKey(); - oldestTimestamp = newStartTime; - this.timelineMetric.setStartTime(newStartTime); - } - this.timelineMetric.setMetricValues(metricsSubSet); - LOG.warn("Metrics cache overflow. Values for metric " + - metric.getMetricName() + " older than " + newEldestTimestamp + - " were removed to clean up the cache."); + if (dataPointsCache.size() == 0) { + oldestTimeStamp = metric.getStartTime(); + newestTimeStamp = metric.getStartTime(); } - this.timelineMetric.addMetricValues(metric.getMetricValues()); - updateTimeDiff(metric.getStartTime()); - } - - public synchronized long getTimeDiff() { - return timeDiff; + TreeMap metricValues = metric.getMetricValues(); + for (Map.Entry entry : metricValues.entrySet()) { + Long key = entry.getKey(); + dataPointsCache.put(key, entry.getValue()); + } + oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime()); + newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime()); } public synchronized TimelineMetric getTimelineMetric() { - return timelineMetric; - } - } - - // TODO: Add weighted eviction - class TimelineMetricHolder extends ConcurrentSkipListMap { - private static final long serialVersionUID = 2L; - // To avoid duplication at the end of the buffer and beginning of the next - // segment of values - private Map endOfBufferTimestamps = new HashMap(); - - public TimelineMetric evict(String metricName) { - TimelineMetricWrapper metricWrapper = this.get(metricName); - - if (metricWrapper == null - || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) { + TreeMap metricValues = new TreeMap<>(dataPointsCache.asMap()); + if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) { return null; } - - TimelineMetric timelineMetric = metricWrapper.getTimelineMetric(); - this.remove(metricName); - - return timelineMetric; - } - - public TimelineMetrics evictAll() { - List metricList = new ArrayList(); - - for (Iterator> it = this.entrySet().iterator(); it.hasNext();) { - Map.Entry cacheEntry = it.next(); - TimelineMetricWrapper metricWrapper = cacheEntry.getValue(); - if (metricWrapper != null) { - TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric(); - metricList.add(timelineMetric); - } - it.remove(); - } - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricList); - return timelineMetrics; - } - - public void put(String metricName, TimelineMetric timelineMetric) { - if (isDuplicate(timelineMetric)) { - return; - } - TimelineMetricWrapper metric = this.get(metricName); - if (metric == null) { - this.put(metricName, new TimelineMetricWrapper(timelineMetric)); - } else { - metric.putMetric(timelineMetric); - } - // Buffer last ts value - endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime()); - } - - /** - * Test whether last buffered timestamp is same as the newly received. - * @param timelineMetric @TimelineMetric - * @return true/false - */ - private boolean isDuplicate(TimelineMetric timelineMetric) { - return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName()) - && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime()); + dataPointsCache.invalidateAll(); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.setMetricValues(metricValues); + return new TimelineMetric(timelineMetric); } } public TimelineMetric getTimelineMetric(String metricName) { - if (timelineMetricCache.containsKey(metricName)) { - return timelineMetricCache.evict(metricName); + TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName); + if (timelineMetricWrapper != null) { + return timelineMetricWrapper.getTimelineMetric(); } - return null; } public TimelineMetrics getAllMetrics() { - return timelineMetricCache.evictAll(); - } + TimelineMetrics timelineMetrics = new TimelineMetrics(); + Collection timelineMetricWrapperCollection = timelineMetricCache.asMap().values(); + List timelineMetricList = + new ArrayList<>(timelineMetricWrapperCollection.size()); + + for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) { + timelineMetricList.add(timelineMetricWrapper.getTimelineMetric()); + } - /** - * Getter method to help testing eviction - * @return @int - */ - public int getMaxEvictionTimeInMillis() { - return maxEvictionTimeInMillis; + timelineMetrics.setMetrics(timelineMetricList); + return timelineMetrics; } + public void putTimelineMetric(TimelineMetric timelineMetric) { - timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric); + String metricName = timelineMetric.getMetricName(); + TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName); + + if (timelineMetricWrapper != null) { + timelineMetricWrapper.putMetric(timelineMetric); + } else { + timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric)); + } } private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) { http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java index 18d973c..87c848b 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java @@ -26,6 +26,7 @@ import java.util.TreeMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class TimelineMetricsCacheTest { @@ -75,8 +76,8 @@ public class TimelineMetricsCacheTest { } @Test - public void testMaxRecsPerName() throws Exception { - int maxRecsPerName = 2; + public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception { + int maxRecsPerName = 3; int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ; TimelineMetricsCache timelineMetricsCache = new TimelineMetricsCache(maxRecsPerName, maxEvictionTime); @@ -124,6 +125,21 @@ public class TimelineMetricsCacheTest { assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime()); } + @Test + public void testEvictionTimeForTimelineMetricWrapperCache() { + int maxEvictionTime = 10; + TimelineMetricsCache timelineMetricsCache = + new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime); + int numberOfMetricsInserted = 1000; + for (int i = 0; i < numberOfMetricsInserted; i++) { + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i)); + } + TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); + assertNotNull(cachedMetric); + assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size() < numberOfMetricsInserted); + } + private TimelineMetric createTimelineMetricSingleValue(final long startTime) { TreeMap values = new TreeMap(); values.put(startTime, 0.0); http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index 4a5abcc..afe0ea9 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -150,7 +150,7 @@ public class HadoopTimelineMetricsSinkTest { expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); // Return eviction time smaller than time diff for first 3 entries // Third entry will result in eviction - expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes(); + expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes(); conf.setListDelimiter(eq(',')); expectLastCall().anyTimes(); @@ -179,7 +179,6 @@ public class HadoopTimelineMetricsSinkTest { expect(metric.value()).andReturn(3.0).once(); expect(metric.value()).andReturn(4.0).once(); expect(metric.value()).andReturn(5.0).once(); - expect(metric.value()).andReturn(6.0).once(); MetricsRecord record = createNiceMock(MetricsRecord.class); expect(record.name()).andReturn("testName").anyTimes(); @@ -196,7 +195,7 @@ public class HadoopTimelineMetricsSinkTest { final Long now = System.currentTimeMillis(); // TODO: Current implementation of cache needs > 1 elements to evict any expect(record.timestamp()).andReturn(now).times(2); - expect(record.timestamp()).andReturn(now + 100l).times(2); + expect(record.timestamp()).andReturn(now + 100l).once(); expect(record.timestamp()).andReturn(now + 200l).once(); expect(record.timestamp()).andReturn(now + 300l).once(); @@ -227,8 +226,6 @@ public class HadoopTimelineMetricsSinkTest { sink.putMetrics(record); // time = t3 sink.putMetrics(record); - // time = t4 - sink.putMetrics(record); verify(conf, sink, record, metric); @@ -242,7 +239,7 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(now, timestamps.next()); Assert.assertEquals(new Long(now + 100l), timestamps.next()); Iterator values = timelineMetric1.getMetricValues().values().iterator(); - Assert.assertEquals(new Double(1.0), values.next()); + Assert.assertEquals(new Double(2.0), values.next()); Assert.assertEquals(new Double(3.0), values.next()); // t3, t4 TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0); @@ -251,8 +248,8 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(new Long(now + 200l), timestamps.next()); Assert.assertEquals(new Long(now + 300l), timestamps.next()); values = timelineMetric2.getMetricValues().values().iterator(); + Assert.assertEquals(new Double(4.0), values.next()); Assert.assertEquals(new Double(5.0), values.next()); - Assert.assertEquals(new Double(6.0), values.next()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-project/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-project/pom.xml b/ambari-project/pom.xml index 7fe1e6b..fdcb31b 100644 --- a/ambari-project/pom.xml +++ b/ambari-project/pom.xml @@ -218,11 +218,6 @@ 9.3-1101-jdbc4 - com.google.guava - guava - 16.0 - - com.google.code.findbugs jsr305 1.3.9 http://git-wip-us.apache.org/repos/asf/ambari/blob/ecaef414/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index b488971..66cc038 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -1102,6 +1102,11 @@ jetty-server + com.google.guava + guava + 16.0 + + commons-logging commons-logging