flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1570: StressSource batching does not work unless maxTotalEvents is specified
Date Tue, 18 Sep 2012 18:27:23 GMT
Updated Branches:
  refs/heads/trunk a78fc2663 -> 466c51902


FLUME-1570: StressSource batching does not work unless maxTotalEvents is specified

(Ted Malaska via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/466c5190
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/466c5190
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/466c5190

Branch: refs/heads/trunk
Commit: 466c519023028a4c72137c4a1063fd6296825e34
Parents: a78fc26
Author: Brock Noland <brock@apache.org>
Authored: Tue Sep 18 13:26:40 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Tue Sep 18 13:26:40 2012 -0500

----------------------------------------------------------------------
 .../java/org/apache/flume/source/StressSource.java |   15 +++--
 .../org/apache/flume/source/TestStressSource.java  |   46 +++++++++++++--
 2 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/466c5190/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index b4a00f5..562b983 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -51,7 +52,8 @@ public class StressSource extends AbstractSource implements
   private int batchSize;
   private long lastSent = 0;
   private Event event;
-  private ArrayList<Event> eventBatchList;
+  private List<Event> eventBatchList;
+  private List<Event> eventBatchListToProcess;
 
   public StressSource() {
     counterGroup = new CounterGroup();
@@ -115,12 +117,13 @@ public class StressSource extends AbstractSource implements
       } else {
         long eventsLeft = maxTotalEvents - totalEventSent;
 
-        if (eventsLeft < batchSize) {
-          eventBatchList.subList(0, (int)eventsLeft - 1);
-          lastSent = eventsLeft;
+        if (maxTotalEvents >= 0 && eventsLeft < batchSize) {
+          eventBatchListToProcess = eventBatchList.subList(0, (int)eventsLeft);
+        } else {
+          eventBatchListToProcess = eventBatchList;
         }
-
-        getChannelProcessor().processEventBatch(eventBatchList);
+        lastSent = eventBatchListToProcess.size();
+        getChannelProcessor().processEventBatch(eventBatchListToProcess);
       }
 
       counterGroup.addAndGet("events.successful", lastSent);

http://git-wip-us.apache.org/repos/asf/flume/blob/466c5190/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
index e98a46f..28270f4 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
 import java.util.ArrayList;
 import java.util.List;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.flume.ChannelException;
@@ -57,8 +58,8 @@ public class TestStressSource {
   }
 
   @SuppressWarnings("unchecked")
-  private List<Event> getEventList(StressSource source) {
-    return field("eventBatchList").ofType(List.class).in(source).get();
+  private List<Event> getLastProcessedEventList(StressSource source) {
+    return field("eventBatchListToProcess").ofType(List.class).in(source).get();
   }
 
   private CounterGroup getCounterGroup(StressSource source) {
@@ -96,10 +97,45 @@ public class TestStressSource {
         TestCase.assertTrue("Source should have sent all events in 4 batches", i == 4);
         break;
       }
+      if (i < 3) {
+        verify(mockProcessor,
+            times(i+1)).processEventBatch(getLastProcessedEventList(source));
+      } else {
+        verify(mockProcessor,
+            times(1)).processEventBatch(getLastProcessedEventList(source));
+      }
     }
-    verify(mockProcessor, times(4)).processEventBatch(getEventList(source));
-    TestCase.assertTrue("Number of successful events should be 35", getCounterGroup(source).get("events.successful")
== 35);
-    TestCase.assertTrue("Number of failure events should be 0", getCounterGroup(source).get("events.failed")
== 0);
+    long successfulEvents = getCounterGroup(source).get("events.successful");
+    TestCase.assertTrue("Number of successful events should be 35 but was " +
+        successfulEvents, successfulEvents == 35);
+    long failedEvents = getCounterGroup(source).get("events.failed");
+    TestCase.assertTrue("Number of failure events should be 0 but was " +
+        failedEvents, failedEvents == 0);
+  }
+
+  @Test
+  public void testBatchEventsWithoutMatTotalEvents() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("batchSize", "10");
+    source.configure(context);
+
+    for (int i = 0; i < 10; i++) {
+      Assert.assertFalse("StressSource with no maxTotalEvents should not return " +
+          Status.BACKOFF, source.process() == Status.BACKOFF);
+    }
+    verify(mockProcessor,
+        times(10)).processEventBatch(getLastProcessedEventList(source));
+
+    long successfulEvents = getCounterGroup(source).get("events.successful");
+    TestCase.assertTrue("Number of successful events should be 100 but was " +
+        successfulEvents, successfulEvents == 100);
+
+    long failedEvents = getCounterGroup(source).get("events.failed");
+    TestCase.assertTrue("Number of failure events should be 0 but was " +
+        failedEvents, failedEvents == 0);
   }
 
   @Test


Mime
View raw message