flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource
Date Thu, 05 Dec 2013 23:19:25 GMT
Updated Branches:
  refs/heads/trunk 705abaf00 -> c23448fc9


FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c23448fc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c23448fc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c23448fc

Branch: refs/heads/trunk
Commit: c23448fc959844eece5a8ab2dbf091c2c4973a26
Parents: 705abaf
Author: Mike Percy <mpercy@cloudera.com>
Authored: Thu Dec 5 12:58:03 2013 -0800
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Thu Dec 5 12:58:03 2013 -0800

----------------------------------------------------------------------
 .../flume/source/SpoolDirectorySource.java      | 47 +++++++++++++++-
 ...olDirectorySourceConfigurationConstants.java |  4 ++
 .../flume/source/TestSpoolDirectorySource.java  | 58 ++++++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  1 +
 4 files changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 72c4059..0160215 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -67,6 +67,9 @@ Configurable, EventDrivenSource {
   private SourceCounter sourceCounter;
   ReliableSpoolingFileEventReader reader;
   private ScheduledExecutorService executor;
+  private boolean backoff = true;
+  private boolean hitChannelException = false;
+  private int maxBackoff;
 
   @Override
   public synchronized void start() {
@@ -161,6 +164,8 @@ Configurable, EventDrivenSource {
       deserializerContext.put(LineDeserializer.MAXLINE_KEY,
           bufferMaxLineLength.toString());
     }
+
+    maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -171,6 +176,28 @@ Configurable, EventDrivenSource {
     return hasFatalError;
   }
 
+
+
+  /**
+   * The class always backs off, this exists only so that we can test without
+   * taking a really long time.
+   * @param backoff - whether the source should backoff if the channel is full
+   */
+  @VisibleForTesting
+  protected void setBackOff(boolean backoff) {
+    this.backoff = backoff;
+  }
+
+  @VisibleForTesting
+  protected boolean hitChannelException() {
+    return hitChannelException;
+  }
+
+  @VisibleForTesting
+  protected SourceCounter getSourceCounter() {
+    return sourceCounter;
+  }
+
   private class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
     private SourceCounter sourceCounter;
@@ -183,6 +210,7 @@ Configurable, EventDrivenSource {
 
     @Override
     public void run() {
+      int backoffInterval = 250;
       try {
         while (true) {
           List<Event> events = reader.readEvents(batchSize);
@@ -192,8 +220,23 @@ Configurable, EventDrivenSource {
           sourceCounter.addToEventReceivedCount(events.size());
           sourceCounter.incrementAppendBatchReceivedCount();
 
-          getChannelProcessor().processEventBatch(events);
-          reader.commit();
+          try {
+            getChannelProcessor().processEventBatch(events);
+            reader.commit();
+          } catch (ChannelException ex) {
+            logger.warn("The channel is full, and cannot write data now. The " +
+              "source will try again after " + String.valueOf(backoffInterval) +
+              " milliseconds");
+            hitChannelException = true;
+            if (backoff) {
+              TimeUnit.MILLISECONDS.sleep(backoffInterval);
+              backoffInterval = backoffInterval << 1;
+              backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
+                                backoffInterval;
+            }
+            continue;
+          }
+          backoffInterval = 250;
           sourceCounter.addToEventAcceptedCount(events.size());
           sourceCounter.incrementAppendBatchAcceptedCount();
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 7bfb0ee..a2befe8 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -74,4 +74,8 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy";
   public static final String DEFAULT_DECODE_ERROR_POLICY =
       DecodeErrorPolicy.FAIL.name();
+
+  public static final String MAX_BACKOFF = "maxBackoff";
+
+  public static final Integer DEFAULT_MAX_BACKOFF = 4000;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 837cf15..9a546a5 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
@@ -33,6 +34,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
@@ -163,4 +165,60 @@ public class TestSpoolDirectorySource {
       Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError());
     }
   }
+
+  @Test
+  public void testSourceDoesNotDieOnFullChannel() throws Exception {
+
+    Context chContext = new Context();
+    chContext.put("capacity", "2");
+    chContext.put("transactionCapacity", "2");
+    chContext.put("keep-alive", "0");
+    channel.stop();
+    Configurables.configure(channel, chContext);
+
+    channel.start();
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+      "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+      f1, Charsets.UTF_8);
+
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+      tmpDir.getAbsolutePath());
+
+    context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2");
+    Configurables.configure(source, context);
+    source.setBackOff(false);
+    source.start();
+
+    // Wait for the source to read enough events to fill up the channel.
+    while(!source.hitChannelException()) {
+      Thread.sleep(50);
+    }
+
+    List<String> dataOut = Lists.newArrayList();
+
+    for (int i = 0; i < 8; ) {
+      Transaction tx = channel.getTransaction();
+      tx.begin();
+      Event e = channel.take();
+      if (e != null) {
+        dataOut.add(new String(e.getBody(), "UTF-8"));
+        i++;
+      }
+      e = channel.take();
+      if (e != null) {
+        dataOut.add(new String(e.getBody(), "UTF-8"));
+        i++;
+      }
+      tx.commit();
+      tx.close();
+    }
+    Assert.assertTrue("Expected to hit ChannelException, but did not!",
+      source.hitChannelException());
+    Assert.assertEquals(8, dataOut.size());
+    source.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0f12427..8687cb7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -951,6 +951,7 @@ fileHeaderKey         file            Header key to use when appending
filename
 ignorePattern         ^$              Regular expression specifying which files to ignore
(skip)
 trackerDir            .flumespool     Directory to store metadata related to processing of
files.
                                       If this path is not an absolute path, then it is interpreted
as relative to the spoolDir.
+maxBackoff            4000            The maximum time (in millis) to wait between consecutive
attempts to write to the channel(s) if the channel is full. The source will start at a low
backoff and increase it exponentially each time the channel throws a ChannelException, upto
the value specified by this parameter.
 batchSize             100             Granularity at which to batch transfer to the channel
 inputCharset          UTF-8           Character set used by deserializers that treat the
input file as text.
 decodeErrorPolicy     ``FAIL``        What to do when we see a non-decodable character in
the input file.


Mime
View raw message