Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75A699AB4 for ; Fri, 7 Dec 2012 00:58:49 +0000 (UTC) Received: (qmail 65150 invoked by uid 500); 7 Dec 2012 00:58:48 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 65084 invoked by uid 500); 7 Dec 2012 00:58:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65029 invoked by uid 99); 7 Dec 2012 00:58:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2012 00:58:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2012 00:58:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B2A362388906 for ; Fri, 7 Dec 2012 00:58:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1418154 - in /hadoop/common/branches/branch-1-win: ./ src/core/org/apache/hadoop/metrics2/ src/core/org/apache/hadoop/metrics2/impl/ src/core/org/apache/hadoop/metrics2/lib/ src/test/org/apache/hadoop/metrics2/impl/ Date: Fri, 07 Dec 2012 00:58:26 -0000 To: common-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121207005826.B2A362388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Fri Dec 7 00:58:25 2012 New Revision: 1418154 URL: http://svn.apache.org/viewvc?rev=1418154&view=rev Log: HADOOP-9090. Merging change r1418101 from branch-1 Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/MetricsSystem.java hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original) +++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Fri Dec 7 00:58:25 2012 @@ -243,3 +243,8 @@ Branch-hadoop-1-win - unreleased HADOOP-8645. HADOOP_HOME and -Dhadoop.home (from hadoop wrapper script) are not uniformly handled. (John Gordon via suresh) + + Merged from branch-1 + + HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via + suresh) Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/MetricsSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/MetricsSystem.java?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/MetricsSystem.java (original) +++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/MetricsSystem.java Fri Dec 7 00:58:25 2012 @@ -52,6 +52,17 @@ public interface MetricsSystem extends M void register(Callback callback); /** + * Requests an immediate publish of all metrics from sources to sinks. + * + * This is a "soft" request: the expectation is that a best effort will be + * done to synchronously snapshot the metrics from all the sources and put + * them in all the sinks (including flushing the sinks) before returning to + * the caller. If this can't be accomplished in reasonable time it's OK to + * return to the caller before everything is done. + */ + public abstract void publishMetricsNow(); + + /** * Shutdown the metrics system completely (usually during server shutdown.) * The MetricsSystemMXBean will be unregistered. */ Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (original) +++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri Dec 7 00:58:25 2012 @@ -19,6 +19,7 @@ package org.apache.hadoop.metrics2.impl; import java.util.Random; +import java.util.concurrent.*; import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt; @@ -28,6 +29,7 @@ import org.apache.hadoop.metrics2.util.C import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.Metric; import org.apache.hadoop.metrics2.MetricsFilter; import org.apache.hadoop.metrics2.MetricsSink; @@ -45,6 +47,7 @@ class MetricsSinkAdapter { private volatile boolean stopping = false; private volatile boolean inError = false; private final int period, firstRetryDelay, retryCount; + private final long oobPutTimeout; private final float retryBackoff; private final MetricsRegistry registry = new MetricsRegistry("sinkadapter"); private final MetricMutableStat latency; @@ -75,6 +78,8 @@ class MetricsSinkAdapter { Contracts.checkArg(retryDelay, retryDelay > 0, "retry delay"); this.retryBackoff = Contracts.checkArg(retryBackoff, retryBackoff > 1, "backoff factor"); + oobPutTimeout = (long) + (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000); this.retryCount = retryCount; this.queue = new SinkQueue( Contracts.checkArg(queueCapacity, queueCapacity > 0, "queue capacity")); @@ -93,6 +98,23 @@ class MetricsSinkAdapter { sinkThread.setDaemon(true); } + public boolean putMetricsImmediate(MetricsBuffer buffer) { + WaitableMetricsBuffer waitableBuffer = + new WaitableMetricsBuffer(buffer); + if (!queue.enqueue(waitableBuffer)) { + LOG.warn(name + " has a full queue and can't consume the given metrics."); + dropped.incr(); + return false; + } + if (!waitableBuffer.waitTillNotified(oobPutTimeout)) { + LOG.warn(name + + " couldn't fulfill an immediate putMetrics request in time." + + " Abandoning."); + return false; + } + return true; + } + boolean putMetrics(MetricsBuffer buffer, long logicalTime) { if (logicalTime % period == 0) { LOG.debug("enqueue, logicalTime="+ logicalTime); @@ -167,6 +189,9 @@ class MetricsSinkAdapter { sink.flush(); latency.add(System.currentTimeMillis() - ts); } + if (buffer instanceof WaitableMetricsBuffer) { + ((WaitableMetricsBuffer)buffer).notifyAnyWaiters(); + } LOG.debug("Done"); } @@ -202,4 +227,26 @@ class MetricsSinkAdapter { return sink; } + static class WaitableMetricsBuffer extends MetricsBuffer { + private final Semaphore notificationSemaphore = + new Semaphore(0); + + public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) { + super(metricsBuffer); + } + + public boolean waitTillNotified(long millisecondsToWait) { + try { + return notificationSemaphore.tryAcquire(millisecondsToWait, + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } + + public void notifyAnyWaiters() { + notificationSemaphore.release(); + } + } + } Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original) +++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri Dec 7 00:58:25 2012 @@ -306,9 +306,19 @@ public class MetricsSystemImpl implement synchronized void onTimerEvent() { logicalTime += period; if (sinks.size() > 0) { - publishMetrics(snapshotMetrics()); + publishMetrics(snapshotMetrics(), false); } } + + /** + * Requests an immediate publish of all metrics from sources to sinks. + */ + @Override + public void publishMetricsNow() { + if (sinks.size() > 0) { + publishMetrics(snapshotMetrics(), true); + } + } /** * snapshot all the sources for a snapshot of metrics/tags @@ -342,12 +352,20 @@ public class MetricsSystemImpl implement /** * Publish a metrics snapshot to all the sinks * @param buffer the metrics snapshot to publish + * @param immediate indicates that we should publish metrics immediately + * instead of using a separate thread. */ - synchronized void publishMetrics(MetricsBuffer buffer) { + synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) { int dropped = 0; for (MetricsSinkAdapter sa : sinks.values()) { long startTime = System.currentTimeMillis(); - dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1; + boolean result; + if (immediate) { + result = sa.putMetricsImmediate(buffer); + } else { + result = sa.putMetrics(buffer, logicalTime); + } + dropped += result ? 0 : 1; publishStat.add(System.currentTimeMillis() - startTime); } dropStat.incr(dropped); Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (original) +++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Fri Dec 7 00:58:25 2012 @@ -76,6 +76,11 @@ public enum DefaultMetricsSystem impleme impl.register(callback); } + @Override + public void publishMetricsNow() { + impl.publishMetricsNow(); + } + public void start() { impl.start(); } @@ -95,5 +100,4 @@ public enum DefaultMetricsSystem impleme public void shutdown() { impl.shutdown(); } - } Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java?rev=1418154&r1=1418153&r2=1418154&view=diff ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java (original) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Fri Dec 7 00:58:25 2012 @@ -23,7 +23,10 @@ import org.apache.hadoop.metrics2.lib.Me import org.apache.hadoop.metrics2.lib.MetricMutableStat; import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; import org.apache.hadoop.metrics2.lib.AbstractMetricsSource; -import java.util.List; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; @@ -52,7 +55,7 @@ public class TestMetricsSystemImpl { private static String hostname = MetricsSystemImpl.getHostname(); @Test public void testInitFirst() throws Exception { - ConfigBuilder cb = new ConfigBuilder().add("default.period", 8) + new ConfigBuilder().add("default.period", 8) .add("source.filter.class", "org.apache.hadoop.metrics2.filter.GlobFilter") .add("test.*.source.filter.class", "${source.filter.class}") @@ -74,8 +77,9 @@ public class TestMetricsSystemImpl { ms.register("sink1", "sink1 desc", sink1); ms.register("sink2", "sink2 desc", sink2); ms.register("sink3", "sink3 desc", sink3); - ms.onTimerEvent(); // trigger something interesting + ms.publishMetricsNow(); // publish the metrics ms.stop(); + ms.shutdown(); verify(sink1, times(3)).putMetrics(r1.capture()); // 2 + 1 sys source List mr1 = r1.getAllValues(); @@ -88,6 +92,178 @@ public class TestMetricsSystemImpl { checkMetricsRecords(mr3, "s3rec"); } + @Test public void testMultiThreadedPublish() throws Exception { + new ConfigBuilder().add("*.period", 80) + .add("test.sink.Collector.queue.capacity", "20") + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + final MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + ms.start(); + final int numThreads = 10; + final CollectingSink sink = new CollectingSink(numThreads); + ms.registerSink("Collector", + "Collector of values from all threads.", sink); + final TestSource[] sources = new TestSource[numThreads]; + final Thread[] threads = new Thread[numThreads]; + final String[] results = new String[numThreads]; + final CyclicBarrier barrier1 = new CyclicBarrier(numThreads), + barrier2 = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + sources[i] = ms.register("threadSource" + i, + "A source of my threaded goodness.", + new TestSource("threadSourceRec" + i)); + threads[i] = new Thread(new Runnable() { + private boolean safeAwait(int mySource, CyclicBarrier barrier) { + try { + barrier1.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + results[mySource] = "Interrupted"; + return false; + } catch (BrokenBarrierException e) { + results[mySource] = "Broken Barrier"; + return false; + } catch (TimeoutException e) { + results[mySource] = "Timed out on barrier"; + return false; + } + return true; + } + + @Override + public void run() { + int mySource = Integer.parseInt(Thread.currentThread().getName()); + if (sink.collected[mySource].get() != 0L) { + results[mySource] = "Someone else collected my metric!"; + return; + } + // There is a race between setting the source value and + // which thread takes a snapshot first. Set the value here + // before any thread starts publishing so they all start + // with the right value. + sources[mySource].g1.set(230); + // Wait for all the threads to come here so we can hammer + // the system at the same time + if (!safeAwait(mySource, barrier1)) return; + ms.publishMetricsNow(); + // Since some other thread may have snatched my metric, + // I need to wait for the threads to finish before checking. + if (!safeAwait(mySource, barrier2)) return; + if (sink.collected[mySource].get() != 230L) { + results[mySource] = "Metric not collected!"; + return; + } + results[mySource] = "Passed"; + } + }, "" + i); + } + for (Thread t : threads) + t.start(); + for (Thread t : threads) + t.join(); + boolean pass = true; + String allResults = ""; + for (String r : results) { + allResults += r + "\n"; + pass = pass && r.equalsIgnoreCase("Passed"); + } + assertTrue(allResults, pass); + ms.stop(); + ms.shutdown(); + } + + private static class CollectingSink implements MetricsSink { + private final AtomicLong[] collected; + + public CollectingSink(int capacity) { + collected = new AtomicLong[capacity]; + for (int i = 0; i < capacity; i++) { + collected[i] = new AtomicLong(); + } + } + + @Override + public void init(SubsetConfiguration conf) { + } + + @Override + public void putMetrics(MetricsRecord record) { + final String prefix = "threadSourceRec"; + if (record.name().startsWith(prefix)) { + final int recordNumber = Integer.parseInt( + record.name().substring(prefix.length())); + ArrayList names = new ArrayList(); + for (Metric m : record.metrics()) { + if (m.name().equalsIgnoreCase("g1")) { + collected[recordNumber].set(m.value().longValue()); + return; + } + names.add(m.name()); + } + } + } + + @Override + public void flush() { + } + } + + @Test public void testHangingSink() { + new ConfigBuilder().add("*.period", 8) + .add("test.sink.hanging.retry.delay", "1") + .add("test.sink.hanging.retry.backoff", "1.01") + .add("test.sink.hanging.retry.count", "0") + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + ms.start(); + TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec")); + s.c1.incr(); + HangingSink hanging = new HangingSink(); + ms.registerSink("hanging", "Hang the sink!", hanging); + ms.publishMetricsNow(); + assertFalse(hanging.getInterrupted()); + ms.stop(); + ms.shutdown(); + assertTrue(hanging.getInterrupted()); + assertTrue("The sink didn't get called after its first hang " + + "for subsequent records.", hanging.getGotCalledSecondTime()); + } + + private static class HangingSink implements MetricsSink { + private volatile boolean interrupted; + private boolean gotCalledSecondTime; + private boolean firstTime = true; + + public boolean getGotCalledSecondTime() { + return gotCalledSecondTime; + } + + public boolean getInterrupted() { + return interrupted; + } + + @Override + public void init(SubsetConfiguration conf) { + } + + @Override + public void putMetrics(MetricsRecord record) { + // No need to hang every time, just the first record. + if (!firstTime) { + gotCalledSecondTime = true; + return; + } + firstTime = false; + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + interrupted = true; + } + } + + @Override + public void flush() { + } + } + static void checkMetricsRecords(List recs, String expected) { LOG.debug(recs); MetricsRecord r = recs.get(0);