flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1572. Add batching support to FILE_ROLL sink.
Date Thu, 13 Sep 2012 01:05:40 GMT
Updated Branches:
  refs/heads/flume-1.3.0 65f58c431 -> 2a8c3450c


FLUME-1572. Add batching support to FILE_ROLL sink.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2a8c3450
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2a8c3450
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2a8c3450

Branch: refs/heads/flume-1.3.0
Commit: 2a8c3450cf00c7b45125cad541d2b27f600b12f7
Parents: 65f58c4
Author: Mike Percy <mpercy@apache.org>
Authored: Wed Sep 12 17:33:16 2012 -0700
Committer: Hari Shreedharan <harishreedharan@gmail.com>
Committed: Wed Sep 12 18:05:31 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/RollingFileSink.java     |   47 ++++++++------
 .../org/apache/flume/sink/TestRollingFileSink.java |    3 +
 2 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2a8c3450/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index e5e97ff..a94eea1 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -47,6 +47,9 @@ public class RollingFileSink extends AbstractSink implements Configurable
{
   private static final Logger logger = LoggerFactory
       .getLogger(RollingFileSink.class);
   private static final long defaultRollInterval = 30;
+  private static final int defaultBatchSize = 100;
+
+  private int batchSize = defaultBatchSize;
 
   private File directory;
   private long rollInterval;
@@ -90,6 +93,8 @@ public class RollingFileSink extends AbstractSink implements Configurable
{
       this.rollInterval = Long.parseLong(rollInterval);
     }
 
+    batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
+
     this.directory = new File(directory);
   }
 
@@ -175,30 +180,32 @@ public class RollingFileSink extends AbstractSink implements Configurable
{
 
     try {
       transaction.begin();
-      event = channel.take();
-
-      if (event != null) {
-        serializer.write(event);
-
-        /*
-         * FIXME: Feature: Rotate on size and time by checking bytes written and
-         * setting shouldRotate = true if we're past a threshold.
-         */
-
-        /*
-         * FIXME: Feature: Control flush interval based on time or number of
-         * events. For now, we're super-conservative and flush on each write.
-         */
-        serializer.flush();
-        outputStream.flush();
-      } else {
-        // No events found, request back-off semantics from runner
-        result = Status.BACKOFF;
+      for (int i = 0; i < batchSize; i++) {
+        event = channel.take();
+        if (event != null) {
+          serializer.write(event);
+
+          /*
+           * FIXME: Feature: Rotate on size and time by checking bytes written and
+           * setting shouldRotate = true if we're past a threshold.
+           */
+
+          /*
+           * FIXME: Feature: Control flush interval based on time or number of
+           * events. For now, we're super-conservative and flush on each write.
+           */
+        } else {
+          // No events found, request back-off semantics from runner
+          result = Status.BACKOFF;
+          break;
+        }
       }
+      serializer.flush();
+      outputStream.flush();
       transaction.commit();
     } catch (Exception ex) {
       transaction.rollback();
-      throw new EventDeliveryException("Failed to process event: " + event, ex);
+      throw new EventDeliveryException("Failed to process transaction", ex);
     } finally {
       transaction.close();
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2a8c3450/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index 10c9b82..07fa644 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -84,6 +84,7 @@ public class TestRollingFileSink {
 
     context.put("sink.directory", tmpDir.getPath());
     context.put("sink.rollInterval", "1");
+    context.put("sink.batchSize", "1");
 
     Configurables.configure(sink, context);
 
@@ -131,6 +132,8 @@ public class TestRollingFileSink {
 
     context.put("sink.directory", tmpDir.getPath());
     context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+
 
     Configurables.configure(sink, context);
 


Mime
View raw message