Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4341C200C79 for ; Fri, 5 May 2017 00:26:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 41F14160BC4; Thu, 4 May 2017 22:26:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6289D160BB0 for ; Fri, 5 May 2017 00:26:27 +0200 (CEST) Received: (qmail 48027 invoked by uid 500); 4 May 2017 22:26:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 48018 invoked by uid 99); 4 May 2017 22:26:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 May 2017 22:26:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4A769E01EA; Thu, 4 May 2017 22:26:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vrushali@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-6375 App level aggregation should not consider metric values reported in the previous aggregation cycle (Varun Saxena via Vrushali C) Date: Thu, 4 May 2017 22:26:26 +0000 (UTC) archived-at: Thu, 04 May 2017 22:26:28 -0000 Repository: hadoop Updated Branches: refs/heads/trunk 61858a5c3 -> 54e2b9e87 YARN-6375 App level aggregation should not consider metric values reported in the previous aggregation cycle (Varun Saxena via Vrushali C) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/54e2b9e8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/54e2b9e8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/54e2b9e8 Branch: refs/heads/trunk Commit: 54e2b9e876fd91712c14ffbc4c49cd946f305aeb Parents: 61858a5 Author: Vrushali Channapattan Authored: Thu May 4 15:25:56 2017 -0700 Committer: Vrushali Channapattan Committed: Thu May 4 15:25:56 2017 -0700 ---------------------------------------------------------------------- .../collector/TimelineCollector.java | 23 +++-- .../collector/TestTimelineCollector.java | 95 +++++++++++++++++++- 2 files changed, 108 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/54e2b9e8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index c94c505..5416b26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -310,13 +310,15 @@ public abstract class TimelineCollector extends CompositeService { // Update aggregateTable Map aggrRow = aggregateTable.get(m); if (aggrRow == null) { - Map tempRow = new ConcurrentHashMap<>(); + Map tempRow = new HashMap<>(); aggrRow = aggregateTable.putIfAbsent(m, tempRow); if (aggrRow == null) { aggrRow = tempRow; } } - aggrRow.put(entityId, m); + synchronized (aggrRow) { + aggrRow.put(entityId, m); + } } } @@ -335,14 +337,17 @@ public abstract class TimelineCollector extends CompositeService { } aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP); Map status = new HashMap<>(); - for (TimelineMetric m : aggrRow.values()) { - TimelineMetric.aggregateTo(m, aggrMetric, status); - // getRealtimeAggregationOp returns an enum so we can directly - // compare with "!=". - if (m.getRealtimeAggregationOp() - != aggrMetric.getRealtimeAggregationOp()) { - aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + synchronized (aggrRow) { + for (TimelineMetric m : aggrRow.values()) { + TimelineMetric.aggregateTo(m, aggrMetric, status); + // getRealtimeAggregationOp returns an enum so we can directly + // compare with "!=". + if (m.getRealtimeAggregationOp() + != aggrMetric.getRealtimeAggregationOp()) { + aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + } } + aggrRow.clear(); } Set metrics = e.getMetrics(); metrics.remove(aggrMetric); http://git-wip-us.apache.org/repos/asf/hadoop/blob/54e2b9e8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index a55f227..0f17553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -18,19 +18,27 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; +import com.google.common.collect.Sets; + import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -187,4 +195,89 @@ public class TestTimelineCollector { return context; } } -} + + private static TimelineEntity createEntity(String id, String type) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType(type); + return entity; + } + + private static TimelineMetric createDummyMetric(long ts, Long value) { + TimelineMetric metric = new TimelineMetric(); + metric.setId("dummy_metric"); + metric.addValue(ts, value); + metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + return metric; + } + + @Test + public void testClearPreviousEntitiesOnAggregation() throws Exception { + final long ts = System.currentTimeMillis(); + TimelineCollector collector = new TimelineCollector("") { + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return new TimelineCollectorContext("cluster", "user", "flow", "1", + 1L, ApplicationId.newInstance(ts, 1).toString()); + } + }; + collector.init(new Configuration()); + collector.setWriter(mock(TimelineWriter.class)); + + // Put 5 entities with different metric values. + TimelineEntities entities = new TimelineEntities(); + for (int i = 1; i <=5; i++) { + TimelineEntity entity = createEntity("e" + i, "type"); + entity.addMetric(createDummyMetric(ts + i, Long.valueOf(i * 50))); + entities.addEntity(entity); + } + collector.putEntities(entities, UserGroupInformation.getCurrentUser()); + + TimelineCollectorContext currContext = collector.getTimelineEntityContext(); + // Aggregate the entities. + Map aggregationGroups + = collector.getAggregationGroups(); + assertEquals(Sets.newHashSet("type"), aggregationGroups.keySet()); + TimelineEntity aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + TimelineMetric aggregatedMetric = + aggregatedEntity.getMetrics().iterator().next(); + assertEquals(750L, aggregatedMetric.getValues().values().iterator().next()); + assertEquals(TimelineMetricOperation.SUM, + aggregatedMetric.getRealtimeAggregationOp()); + + // Aggregate entities. + aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + aggregatedMetric = aggregatedEntity.getMetrics().iterator().next(); + // No values aggregated as no metrics put for an entity between this + // aggregation and the previous one. + assertTrue(aggregatedMetric.getValues().isEmpty()); + assertEquals(TimelineMetricOperation.NOP, + aggregatedMetric.getRealtimeAggregationOp()); + + // Put 3 entities. + entities = new TimelineEntities(); + for (int i = 1; i <=3; i++) { + TimelineEntity entity = createEntity("e" + i, "type"); + entity.addMetric(createDummyMetric(System.currentTimeMillis() + i, 50L)); + entities.addEntity(entity); + } + aggregationGroups = collector.getAggregationGroups(); + collector.putEntities(entities, UserGroupInformation.getCurrentUser()); + + // Aggregate entities. + aggregatedEntity = TimelineCollector. + aggregateWithoutGroupId(aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + // Last 3 entities picked up for aggregation. + aggregatedMetric = aggregatedEntity.getMetrics().iterator().next(); + assertEquals(150L, aggregatedMetric.getValues().values().iterator().next()); + assertEquals(TimelineMetricOperation.SUM, + aggregatedMetric.getRealtimeAggregationOp()); + + collector.close(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org