flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1855. Sequence gen source should be able to stop after a fixed number of events.
Date Thu, 17 Jan 2013 08:51:57 GMT
Updated Branches:
  refs/heads/trunk 11fada202 -> 960d7c4b0


FLUME-1855. Sequence gen source should be able to stop after a fixed number of events.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/960d7c4b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/960d7c4b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/960d7c4b

Branch: refs/heads/trunk
Commit: 960d7c4b053669d9eb7c24f032d55a2ff659820b
Parents: 11fada2
Author: Mike Percy <mpercy@apache.org>
Authored: Thu Jan 17 00:50:15 2013 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Jan 17 00:50:15 2013 -0800

----------------------------------------------------------------------
 .../flume/source/SequenceGeneratorSource.java      |   29 ++++++++++++---
 1 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/960d7c4b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 1fbcf42..3cb1ccf 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -42,6 +42,8 @@ public class SequenceGeneratorSource extends AbstractSource implements
   private int batchSize;
   private CounterGroup counterGroup;
   private List<Event> batchArrayList;
+  private long totalEvents;
+  private long eventsSent = 0;
 
   public SequenceGeneratorSource() {
     sequence = 0;
@@ -58,28 +60,45 @@ public class SequenceGeneratorSource extends AbstractSource implements
     if (batchSize > 1) {
       batchArrayList = new ArrayList<Event>(batchSize);
     }
+    totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);
   }
 
   @Override
   public Status process() throws EventDeliveryException {
 
+    Status status = Status.READY;
+    int i = 0;
     try {
       if (batchSize <= 1) {
-        getChannelProcessor().processEvent(
+        if(eventsSent < totalEvents) {
+          getChannelProcessor().processEvent(
             EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+          eventsSent++;
+        } else {
+          status = Status.BACKOFF;
+        }
       } else {
         batchArrayList.clear();
-        for (int i = 0; i < batchSize; i++) {
-          batchArrayList.add(i, EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+        for (i = 0; i < batchSize; i++) {
+          if(eventsSent < totalEvents){
+            batchArrayList.add(i, EventBuilder.withBody(String
+              .valueOf(sequence++).getBytes()));
+            eventsSent++;
+          } else {
+            status = Status.BACKOFF;
+          }
+        }
+        if(!batchArrayList.isEmpty()) {
+          getChannelProcessor().processEventBatch(batchArrayList);
         }
-        getChannelProcessor().processEventBatch(batchArrayList);
       }
       counterGroup.incrementAndGet("events.successful");
     } catch (ChannelException ex) {
       counterGroup.incrementAndGet("events.failed");
+      eventsSent -= i;
     }
 
-    return Status.READY;
+    return status;
   }
 
   @Override


Mime
View raw message