Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9523410711 for ; Thu, 19 Feb 2015 17:39:01 +0000 (UTC) Received: (qmail 86543 invoked by uid 500); 19 Feb 2015 17:39:01 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 86477 invoked by uid 500); 19 Feb 2015 17:39:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 86464 invoked by uid 99); 19 Feb 2015 17:39:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 17:39:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3C818E0534; Thu, 19 Feb 2015 17:39:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Message-Id: <1b4e5a239fd049a9aa040c1a8361241c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA (cherry picked from commit f0f299268625af275522f55d5bfc43118c31bdd8) Date: Thu, 19 Feb 2015 17:39:01 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 fee29e4a4 -> b1fc4ec57 HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA (cherry picked from commit f0f299268625af275522f55d5bfc43118c31bdd8) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b1fc4ec5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b1fc4ec5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b1fc4ec5 Branch: refs/heads/branch-2 Commit: b1fc4ec57a62dd2eae2ea811e51b32a3fcb4c754 Parents: fee29e4 Author: Jason Lowe Authored: Thu Feb 19 17:38:39 2015 +0000 Committer: Jason Lowe Committed: Thu Feb 19 17:38:39 2015 +0000 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++ .../metrics2/impl/MetricsSinkAdapter.java | 15 +++++- .../hadoop-common/src/site/apt/Metrics.apt.vm | 5 +- .../metrics2/impl/TestMetricsSystemImpl.java | 50 ++++++++++++++++++++ 4 files changed, 67 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1fc4ec5/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b93a801..3f41cbc 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -565,6 +565,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11595. Add default implementation for AbstractFileSystem#truncate. (yliu) + HADOOP-9087. Queue size metric for metric sinks isn't actually maintained + (Akira AJISAKA via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1fc4ec5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java index de39a13..478c316 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java @@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { boolean putMetrics(MetricsBuffer buffer, long logicalTime) { if (logicalTime % period == 0) { LOG.debug("enqueue, logicalTime="+ logicalTime); - if (queue.enqueue(buffer)) return true; + if (queue.enqueue(buffer)) { + refreshQueueSizeGauge(); + return true; + } dropped.incr(); return false; } @@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { public boolean putMetricsImmediate(MetricsBuffer buffer) { WaitableMetricsBuffer waitableBuffer = new WaitableMetricsBuffer(buffer); - if (!queue.enqueue(waitableBuffer)) { + if (queue.enqueue(waitableBuffer)) { + refreshQueueSizeGauge(); + } else { LOG.warn(name + " has a full queue and can't consume the given metrics."); dropped.incr(); return false; @@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { while (!stopping) { try { queue.consumeAll(this); + refreshQueueSizeGauge(); retryDelay = firstRetryDelay; n = retryCount; inError = false; @@ -154,12 +160,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { "suppressing further error messages", e); } queue.clear(); + refreshQueueSizeGauge(); inError = true; // Don't keep complaining ad infinitum } } } } + private void refreshQueueSizeGauge() { + qsize.set(queue.size()); + } + @Override public void consume(MetricsBuffer buffer) { long ts = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1fc4ec5/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm b/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm index 02ff28b..915467e 100644 --- a/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm +++ b/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm @@ -843,10 +843,7 @@ metricssystem context |<<>><<>> | Total number of dropped sink operations | for the *-------------------------------------+--------------------------------------+ -|<<>><<>> | Current queue length of sink operations \ - | (BUT always set to 0 because nothing to - | increment this metrics, see - | {{{https://issues.apache.org/jira/browse/HADOOP-9941}HADOOP-9941}}) +|<<>><<>> | Current queue length of the sink *-------------------------------------+--------------------------------------+ default context http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1fc4ec5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index 4c2ebc8..0f7b15f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -29,7 +29,9 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -434,6 +436,54 @@ public class TestMetricsSystemImpl { new MetricGaugeInt(MsInfo.NumActiveSinks, 3))); } + @Test + public void testQSize() throws Exception { + new ConfigBuilder().add("*.period", 8) + .add("test.sink.test.class", TestSink.class.getName()) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + final CountDownLatch proceedSignal = new CountDownLatch(1); + final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1); + ms.start(); + try { + MetricsSink slowSink = mock(MetricsSink.class); + MetricsSink dataSink = mock(MetricsSink.class); + ms.registerSink("slowSink", + "The sink that will wait on putMetric", slowSink); + ms.registerSink("dataSink", + "The sink I'll use to get info about slowSink", dataSink); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + reachedPutMetricSignal.countDown(); + proceedSignal.await(); + return null; + } + }).when(slowSink).putMetrics(any(MetricsRecord.class)); + + // trigger metric collection first time + ms.onTimerEvent(); + assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS)); + // Now that the slow sink is still processing the first metric, + // its queue length should be 1 for the second collection. + ms.onTimerEvent(); + verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture()); + List mr = r1.getAllValues(); + Number qSize = Iterables.find(mr.get(1).metrics(), + new Predicate() { + @Override + public boolean apply(@Nullable AbstractMetric input) { + assert input != null; + return input.name().equals("Sink_slowSinkQsize"); + } + }).value(); + assertEquals(1, qSize); + } finally { + proceedSignal.countDown(); + ms.stop(); + } + } + @Metrics(context="test") private static class TestSource { @Metric("C1 desc") MutableCounterLong c1;