flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1536: Support for batch size in StressSource
Date Mon, 10 Sep 2012 15:38:44 GMT
Updated Branches:
  refs/heads/trunk 99708a1da -> 17e65714b


FLUME-1536: Support for batch size in StressSource

(Ted Malaska via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/17e65714
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/17e65714
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/17e65714

Branch: refs/heads/trunk
Commit: 17e65714b9ba83dbe856aad913d9efc9c72f7c61
Parents: 99708a1
Author: Brock Noland <brock@apache.org>
Authored: Mon Sep 10 10:36:54 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Mon Sep 10 10:36:54 2012 -0500

----------------------------------------------------------------------
 .../java/org/apache/flume/source/StressSource.java |   62 +++++++++++++--
 .../org/apache/flume/source/TestStressSource.java  |   38 +++++++++
 2 files changed, 93 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/17e65714/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index 5b73910..ba6f54a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.source;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flume.ChannelException;
@@ -45,39 +46,86 @@ public class StressSource extends AbstractSource implements
 
   private CounterGroup counterGroup;
   private byte[] buffer;
-  private Event event;
   private long maxTotalEvents;
   private long maxSuccessfulEvents;
+  private int batchSize;
+  private long lastSent = 0;
+  private Event event;
+  private ArrayList<Event> eventBatchList;
 
   public StressSource() {
     counterGroup = new CounterGroup();
 
   }
+
+  /**
+   * Read parameters from context
+   * <li>-maxTotalEvents = type long that defines the total number of events to be
sent
+   * <li>-maxSuccessfulEvents = type long that defines the total number of events to
be sent
+   * <li>-size = type int that defines the number of bytes in each event
+   * <li>-batchSize = type int that defines the number of events being sent in one
batch
+   */
   @Override
   public void configure(Context context) {
     /* Limit on the total number of events. */
     maxTotalEvents = context.getLong("maxTotalEvents", -1L);
     /* Limit on the total number of successful events. */
     maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);
+    /* Set max events in a batch submission */
+    batchSize = context.getInteger("batchSize", 1);
     /* Size of events to be generated. */
     int size = context.getInteger("size", 500);
-    buffer = new byte[size];
+
+    prepEventData(size);
+  }
+
+  private void prepEventData(int bufferSize) {
+    buffer = new byte[bufferSize];
     Arrays.fill(buffer, Byte.MAX_VALUE);
-    event = EventBuilder.withBody(buffer);
+
+    if (batchSize > 1) {
+      //Create event objects in case of batch test
+      eventBatchList = new ArrayList<Event>();
+
+      for (int i = 0; i < batchSize; i++)
+      {
+        eventBatchList.add(EventBuilder.withBody(buffer));
+      }
+    } else {
+      //Create single event in case of non-batch test
+      event = EventBuilder.withBody(buffer);
+    }
   }
+
   @Override
   public Status process() throws EventDeliveryException {
+    long totalEventSent = counterGroup.addAndGet("events.total", lastSent);
+
     if ((maxTotalEvents >= 0 &&
-        counterGroup.incrementAndGet("events.total") > maxTotalEvents) ||
+        totalEventSent >= maxTotalEvents) ||
         (maxSuccessfulEvents >= 0 &&
         counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
       return Status.BACKOFF;
     }
     try {
-      getChannelProcessor().processEvent(event);
-      counterGroup.incrementAndGet("events.successful");
+      lastSent = batchSize;
+
+      if (batchSize == 1) {
+        getChannelProcessor().processEvent(event);
+      } else {
+        long eventsLeft = maxTotalEvents - totalEventSent;
+
+        if (eventsLeft < batchSize) {
+          eventBatchList.subList(0, (int)eventsLeft - 1);
+          lastSent = eventsLeft;
+        }
+
+        getChannelProcessor().processEventBatch(eventBatchList);
+      }
+
+      counterGroup.addAndGet("events.successful", lastSent);
     } catch (ChannelException ex) {
-      counterGroup.incrementAndGet("events.failed");
+      counterGroup.addAndGet("events.failed", lastSent);
       return Status.BACKOFF;
     }
     return Status.READY;

http://git-wip-us.apache.org/repos/asf/flume/blob/17e65714/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
index 4ec16c7..e98a46f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -26,10 +26,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,6 +56,16 @@ public class TestStressSource {
           .get();
   }
 
+  @SuppressWarnings("unchecked")
+  private List<Event> getEventList(StressSource source) {
+    return field("eventBatchList").ofType(List.class).in(source).get();
+  }
+
+  private CounterGroup getCounterGroup(StressSource source) {
+    return field("counterGroup").ofType(CounterGroup.class).in(source).get();
+  }
+
+
   @Test
   public void testMaxTotalEvents() throws InterruptedException,
       EventDeliveryException {
@@ -65,6 +82,27 @@ public class TestStressSource {
   }
 
   @Test
+  public void testBatchEvents() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("maxTotalEvents", "35");
+    context.put("batchSize", "10");
+    source.configure(context);
+
+    for (int i = 0; i < 50; i++) {
+      if (source.process() == Status.BACKOFF) {
+        TestCase.assertTrue("Source should have sent all events in 4 batches", i == 4);
+        break;
+      }
+    }
+    verify(mockProcessor, times(4)).processEventBatch(getEventList(source));
+    TestCase.assertTrue("Number of successful events should be 35", getCounterGroup(source).get("events.successful")
== 35);
+    TestCase.assertTrue("Number of failure events should be 0", getCounterGroup(source).get("events.failed")
== 0);
+  }
+
+  @Test
   public void testMaxSuccessfulEvents() throws InterruptedException,
       EventDeliveryException {
     StressSource source = new StressSource();


Mime
View raw message