Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 65B9B107BC for ; Thu, 5 Dec 2013 23:19:26 +0000 (UTC) Received: (qmail 91054 invoked by uid 500); 5 Dec 2013 23:19:26 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 91005 invoked by uid 500); 5 Dec 2013 23:19:25 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 90998 invoked by uid 99); 5 Dec 2013 23:19:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Dec 2013 23:19:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3147B82C9E2; Thu, 5 Dec 2013 23:19:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <5c294e3683354b27a262bc3b29b2fb54@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource Date: Thu, 5 Dec 2013 23:19:25 +0000 (UTC) Updated Branches: refs/heads/trunk 705abaf00 -> c23448fc9 FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c23448fc Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c23448fc Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c23448fc Branch: refs/heads/trunk Commit: c23448fc959844eece5a8ab2dbf091c2c4973a26 Parents: 705abaf Author: Mike Percy Authored: Thu Dec 5 12:58:03 2013 -0800 Committer: Mike Percy Committed: Thu Dec 5 12:58:03 2013 -0800 ---------------------------------------------------------------------- .../flume/source/SpoolDirectorySource.java | 47 +++++++++++++++- ...olDirectorySourceConfigurationConstants.java | 4 ++ .../flume/source/TestSpoolDirectorySource.java | 58 ++++++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + 4 files changed, 108 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 72c4059..0160215 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -67,6 +67,9 @@ Configurable, EventDrivenSource { private SourceCounter sourceCounter; ReliableSpoolingFileEventReader reader; private ScheduledExecutorService executor; + private boolean backoff = true; + private boolean hitChannelException = false; + private int maxBackoff; @Override public synchronized void start() { @@ -161,6 +164,8 @@ Configurable, EventDrivenSource { deserializerContext.put(LineDeserializer.MAXLINE_KEY, bufferMaxLineLength.toString()); } + + maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } @@ -171,6 +176,28 @@ Configurable, EventDrivenSource { return hasFatalError; } + + + /** + * The class always backs off, this exists only so that we can test without + * taking a really long time. + * @param backoff - whether the source should backoff if the channel is full + */ + @VisibleForTesting + protected void setBackOff(boolean backoff) { + this.backoff = backoff; + } + + @VisibleForTesting + protected boolean hitChannelException() { + return hitChannelException; + } + + @VisibleForTesting + protected SourceCounter getSourceCounter() { + return sourceCounter; + } + private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; private SourceCounter sourceCounter; @@ -183,6 +210,7 @@ Configurable, EventDrivenSource { @Override public void run() { + int backoffInterval = 250; try { while (true) { List events = reader.readEvents(batchSize); @@ -192,8 +220,23 @@ Configurable, EventDrivenSource { sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); - getChannelProcessor().processEventBatch(events); - reader.commit(); + try { + getChannelProcessor().processEventBatch(events); + reader.commit(); + } catch (ChannelException ex) { + logger.warn("The channel is full, and cannot write data now. The " + + "source will try again after " + String.valueOf(backoffInterval) + + " milliseconds"); + hitChannelException = true; + if (backoff) { + TimeUnit.MILLISECONDS.sleep(backoffInterval); + backoffInterval = backoffInterval << 1; + backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : + backoffInterval; + } + continue; + } + backoffInterval = 250; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 7bfb0ee..a2befe8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -74,4 +74,8 @@ public class SpoolDirectorySourceConfigurationConstants { public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy"; public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL.name(); + + public static final String MAX_BACKOFF = "maxBackoff"; + + public static final Integer DEFAULT_MAX_BACKOFF = 4000; } http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 837cf15..9a546a5 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; @@ -33,6 +34,7 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; import org.junit.After; @@ -163,4 +165,60 @@ public class TestSpoolDirectorySource { Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError()); } } + + @Test + public void testSourceDoesNotDieOnFullChannel() throws Exception { + + Context chContext = new Context(); + chContext.put("capacity", "2"); + chContext.put("transactionCapacity", "2"); + chContext.put("keep-alive", "0"); + channel.stop(); + Configurables.configure(channel, chContext); + + channel.start(); + Context context = new Context(); + File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); + + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); + + + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + + context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2"); + Configurables.configure(source, context); + source.setBackOff(false); + source.start(); + + // Wait for the source to read enough events to fill up the channel. + while(!source.hitChannelException()) { + Thread.sleep(50); + } + + List dataOut = Lists.newArrayList(); + + for (int i = 0; i < 8; ) { + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + if (e != null) { + dataOut.add(new String(e.getBody(), "UTF-8")); + i++; + } + e = channel.take(); + if (e != null) { + dataOut.add(new String(e.getBody(), "UTF-8")); + i++; + } + tx.commit(); + tx.close(); + } + Assert.assertTrue("Expected to hit ChannelException, but did not!", + source.hitChannelException()); + Assert.assertEquals(8, dataOut.size()); + source.stop(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 0f12427..8687cb7 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -951,6 +951,7 @@ fileHeaderKey file Header key to use when appending filename ignorePattern ^$ Regular expression specifying which files to ignore (skip) trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. +maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. batchSize 100 Granularity at which to batch transfer to the channel inputCharset UTF-8 Character set used by deserializers that treat the input file as text. decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file.