Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E6DAF7B03 for ; Wed, 21 Sep 2011 00:58:09 +0000 (UTC) Received: (qmail 23626 invoked by uid 500); 21 Sep 2011 00:58:09 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 23605 invoked by uid 500); 21 Sep 2011 00:58:09 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 23594 invoked by uid 99); 21 Sep 2011 00:58:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 00:58:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2011 00:58:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2D024238897D; Wed, 21 Sep 2011 00:57:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1173447 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-core/src/test/java/org/apache/... Date: Wed, 21 Sep 2011 00:57:43 -0000 To: flume-commits@incubator.apache.org From: esammer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110921005744.2D024238897D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: esammer Date: Wed Sep 21 00:57:43 2011 New Revision: 1173447 URL: http://svn.apache.org/viewvc?rev=1173447&view=rev Log: - Sources and sinks now hint as to their willingness to take more data by returning a status. - Polling source / sink runners understand and respect a desire to backoff. Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java Wed Sep 21 00:57:43 2011 @@ -2,6 +2,10 @@ package org.apache.flume; public interface PollableSink extends Sink { - public void process() throws InterruptedException, EventDeliveryException; + public Status process() throws EventDeliveryException; + + public static enum Status { + READY, BACKOFF + } } Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java Wed Sep 21 00:57:43 2011 @@ -2,10 +2,10 @@ package org.apache.flume; public interface PollableSource extends Source { - /* - * FIXME: Arvind removed InterruptedException from the interface in his - * branch. - */ - public void process() throws InterruptedException, EventDeliveryException; + public Status process() throws EventDeliveryException; + + public static enum Status { + READY, BACKOFF + } } Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Wed Sep 21 00:57:43 2011 @@ -31,7 +31,7 @@ public class LoggerSink extends Abstract .getLogger(LoggerSink.class); @Override - public void process() throws EventDeliveryException { + public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; @@ -39,8 +39,15 @@ public class LoggerSink extends Abstract try { transaction.begin(); event = channel.take(); - logger.info("Event: " + event); - transaction.commit(); + + if (event != null) { + logger.info("Event: " + event); + transaction.commit(); + return Status.READY; + } else { + transaction.rollback(); + return Status.BACKOFF; + } } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to log event: " + event, ex); Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Wed Sep 21 00:57:43 2011 @@ -37,7 +37,7 @@ public class NullSink extends AbstractSi } @Override - public void process() throws EventDeliveryException { + public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; @@ -45,8 +45,15 @@ public class NullSink extends AbstractSi try { transaction.begin(); event = channel.take(); - //logger.debug("Consumed the event: " + event); transaction.commit(); + + if (event != null) { + // logger.debug("Consumed the event: " + event); + counterGroup.incrementAndGet("events.successful"); + return Status.READY; + } else { + return Status.BACKOFF; + } } catch (Exception ex) { transaction.rollback(); counterGroup.incrementAndGet("events.failed"); @@ -55,7 +62,6 @@ public class NullSink extends AbstractSi } finally { transaction.close(); } - counterGroup.incrementAndGet("events.successful"); } @Override Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java Wed Sep 21 00:57:43 2011 @@ -91,7 +91,11 @@ public class PollableSinkRunner extends while (!shouldStop.get()) { try { - sink.process(); + if (sink.process().equals(PollableSink.Status.BACKOFF)) { + counterGroup.incrementAndGet("runner.backoffs"); + /* Should this be configurable? */ + Thread.sleep(500); + } } catch (InterruptedException e) { logger.debug("Interrupted while processing an event. Exiting."); counterGroup.incrementAndGet("runner.interruptions"); @@ -101,7 +105,7 @@ public class PollableSinkRunner extends } } - logger.debug("Polling sink runner exiting"); + logger.debug("Polling runner exiting. Metrics:{}", counterGroup); } } Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Wed Sep 21 00:57:43 2011 @@ -116,7 +116,7 @@ public class RollingFileSink extends Abs } @Override - public void process() throws EventDeliveryException { + public Status process() throws EventDeliveryException { if (shouldRotate) { logger.debug("Time to rotate {}", pathController.getCurrentFile()); @@ -158,23 +158,31 @@ public class RollingFileSink extends Abs transaction.begin(); event = channel.take(); - byte[] bytes = formatter.format(event); + if (event != null) { + byte[] bytes = formatter.format(event); - outputStream.write(bytes); + outputStream.write(bytes); - /* - * FIXME: Feature: Rotate on size and time by checking bytes written and - * setting shouldRotate = true if we're past a threshold. - */ - counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length); - - /* - * FIXME: Feature: Control flush interval based on time or number of - * events. For now, we're super-conservative and flush on each write. - */ - outputStream.flush(); + /* + * FIXME: Feature: Rotate on size and time by checking bytes written and + * setting shouldRotate = true if we're past a threshold. + */ + counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length); + + /* + * FIXME: Feature: Control flush interval based on time or number of + * events. For now, we're super-conservative and flush on each write. + */ + outputStream.flush(); + + transaction.commit(); - transaction.commit(); + return Status.READY; + } else { + transaction.rollback(); + + return Status.BACKOFF; + } } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to process event: " + event, ex); Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java Wed Sep 21 00:57:43 2011 @@ -87,12 +87,19 @@ public class PollableSourceRunner extend logger.debug("Polling runner starting. Source:{}", source); while (!shouldStop.get()) { + counterGroup.incrementAndGet("runner.polls"); + try { - source.process(); - counterGroup.incrementAndGet("events.successful"); + if (source.process().equals(PollableSource.Status.BACKOFF)) { + counterGroup.incrementAndGet("runner.backoffs"); + Thread.sleep(500); + } + + } catch (InterruptedException e) { + logger.info("Source runner interrupted. Exiting"); } catch (Exception e) { logger.error("Unable to process event. Exception follows.", e); - counterGroup.incrementAndGet("events.failed"); + counterGroup.incrementAndGet("runner.failures"); } } Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Wed Sep 21 00:57:43 2011 @@ -16,7 +16,7 @@ public class SequenceGeneratorSource ext } @Override - public void process() throws InterruptedException, EventDeliveryException { + public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); @@ -26,8 +26,11 @@ public class SequenceGeneratorSource ext transaction.commit(); } catch (Exception e) { transaction.rollback(); + } finally { + transaction.close(); } - /* FIXME: Add finally { transaction.close() } */ + + return Status.READY; } } Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java (original) +++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java Wed Sep 21 00:57:43 2011 @@ -3,17 +3,25 @@ package org.apache.flume.source; import java.util.concurrent.CountDownLatch; import org.apache.flume.Channel; +import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; +import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.lifecycle.LifecycleState; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestPollableSourceRunner { + private static final Logger logger = LoggerFactory + .getLogger(TestPollableSourceRunner.class); + private PollableSourceRunner sourceRunner; @Before @@ -26,6 +34,8 @@ public class TestPollableSourceRunner { final Channel channel = new MemoryChannel(); final CountDownLatch latch = new CountDownLatch(50); + Configurables.configure(channel, new Context()); + PollableSource source = new PollableSource() { @Override @@ -40,17 +50,29 @@ public class TestPollableSourceRunner { } @Override - public void process() throws InterruptedException, EventDeliveryException { - Event event = EventBuilder.withBody(String.valueOf( - "Event " + latch.getCount()).getBytes()); + public Status process() throws EventDeliveryException { + Transaction transaction = channel.getTransaction(); - latch.countDown(); - - if (latch.getCount() % 20 == 0) { - throw new EventDeliveryException("I don't like event:" + event); + try { + transaction.begin(); + Event event = EventBuilder.withBody(String.valueOf( + "Event " + latch.getCount()).getBytes()); + + latch.countDown(); + + if (latch.getCount() % 20 == 0) { + throw new EventDeliveryException("I don't like event:" + event); + } + channel.put(event); + transaction.commit(); + return Status.READY; + } catch (EventDeliveryException e) { + logger.error("Unable to deliver event. Exception follows.", e); + transaction.rollback(); + return Status.BACKOFF; + } finally { + transaction.close(); } - - channel.put(event); } @Override Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff ============================================================================== --- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java (original) +++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java Wed Sep 21 00:57:43 2011 @@ -5,13 +5,18 @@ import org.apache.flume.EventDeliveryExc public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource { @Override - public void process() throws EventDeliveryException, InterruptedException { + public Status process() throws EventDeliveryException { if (Math.round(Math.random()) == 1) { - Thread.sleep(1000); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Do nothing. + } + throw new EventDeliveryException("I'm broken!"); } else { - super.process(); + return super.process(); } }