Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44018DB37 for ; Mon, 10 Sep 2012 15:38:45 +0000 (UTC) Received: (qmail 72945 invoked by uid 500); 10 Sep 2012 15:38:45 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 72895 invoked by uid 500); 10 Sep 2012 15:38:45 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 72888 invoked by uid 99); 10 Sep 2012 15:38:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2012 15:38:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id ED4B328FF2; Mon, 10 Sep 2012 15:38:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1536: Support for batch size in StressSource Message-Id: <20120910153844.ED4B328FF2@tyr.zones.apache.org> Date: Mon, 10 Sep 2012 15:38:44 +0000 (UTC) Updated Branches: refs/heads/trunk 99708a1da -> 17e65714b FLUME-1536: Support for batch size in StressSource (Ted Malaska via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/17e65714 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/17e65714 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/17e65714 Branch: refs/heads/trunk Commit: 17e65714b9ba83dbe856aad913d9efc9c72f7c61 Parents: 99708a1 Author: Brock Noland Authored: Mon Sep 10 10:36:54 2012 -0500 Committer: Brock Noland Committed: Mon Sep 10 10:36:54 2012 -0500 ---------------------------------------------------------------------- .../java/org/apache/flume/source/StressSource.java | 62 +++++++++++++-- .../org/apache/flume/source/TestStressSource.java | 38 +++++++++ 2 files changed, 93 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/17e65714/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index 5b73910..ba6f54a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -19,6 +19,7 @@ package org.apache.flume.source; +import java.util.ArrayList; import java.util.Arrays; import org.apache.flume.ChannelException; @@ -45,39 +46,86 @@ public class StressSource extends AbstractSource implements private CounterGroup counterGroup; private byte[] buffer; - private Event event; private long maxTotalEvents; private long maxSuccessfulEvents; + private int batchSize; + private long lastSent = 0; + private Event event; + private ArrayList eventBatchList; public StressSource() { counterGroup = new CounterGroup(); } + + /** + * Read parameters from context + *
  • -maxTotalEvents = type long that defines the total number of events to be sent + *
  • -maxSuccessfulEvents = type long that defines the total number of events to be sent + *
  • -size = type int that defines the number of bytes in each event + *
  • -batchSize = type int that defines the number of events being sent in one batch + */ @Override public void configure(Context context) { /* Limit on the total number of events. */ maxTotalEvents = context.getLong("maxTotalEvents", -1L); /* Limit on the total number of successful events. */ maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L); + /* Set max events in a batch submission */ + batchSize = context.getInteger("batchSize", 1); /* Size of events to be generated. */ int size = context.getInteger("size", 500); - buffer = new byte[size]; + + prepEventData(size); + } + + private void prepEventData(int bufferSize) { + buffer = new byte[bufferSize]; Arrays.fill(buffer, Byte.MAX_VALUE); - event = EventBuilder.withBody(buffer); + + if (batchSize > 1) { + //Create event objects in case of batch test + eventBatchList = new ArrayList(); + + for (int i = 0; i < batchSize; i++) + { + eventBatchList.add(EventBuilder.withBody(buffer)); + } + } else { + //Create single event in case of non-batch test + event = EventBuilder.withBody(buffer); + } } + @Override public Status process() throws EventDeliveryException { + long totalEventSent = counterGroup.addAndGet("events.total", lastSent); + if ((maxTotalEvents >= 0 && - counterGroup.incrementAndGet("events.total") > maxTotalEvents) || + totalEventSent >= maxTotalEvents) || (maxSuccessfulEvents >= 0 && counterGroup.get("events.successful") >= maxSuccessfulEvents)) { return Status.BACKOFF; } try { - getChannelProcessor().processEvent(event); - counterGroup.incrementAndGet("events.successful"); + lastSent = batchSize; + + if (batchSize == 1) { + getChannelProcessor().processEvent(event); + } else { + long eventsLeft = maxTotalEvents - totalEventSent; + + if (eventsLeft < batchSize) { + eventBatchList.subList(0, (int)eventsLeft - 1); + lastSent = eventsLeft; + } + + getChannelProcessor().processEventBatch(eventBatchList); + } + + counterGroup.addAndGet("events.successful", lastSent); } catch (ChannelException ex) { - counterGroup.incrementAndGet("events.failed"); + counterGroup.addAndGet("events.failed", lastSent); return Status.BACKOFF; } return Status.READY; http://git-wip-us.apache.org/repos/asf/flume/blob/17e65714/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java index 4ec16c7..e98a46f 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java @@ -26,10 +26,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + import org.apache.flume.ChannelException; import org.apache.flume.Context; +import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; +import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; import org.junit.Before; import org.junit.Test; @@ -49,6 +56,16 @@ public class TestStressSource { .get(); } + @SuppressWarnings("unchecked") + private List getEventList(StressSource source) { + return field("eventBatchList").ofType(List.class).in(source).get(); + } + + private CounterGroup getCounterGroup(StressSource source) { + return field("counterGroup").ofType(CounterGroup.class).in(source).get(); + } + + @Test public void testMaxTotalEvents() throws InterruptedException, EventDeliveryException { @@ -65,6 +82,27 @@ public class TestStressSource { } @Test + public void testBatchEvents() throws InterruptedException, + EventDeliveryException { + StressSource source = new StressSource(); + source.setChannelProcessor(mockProcessor); + Context context = new Context(); + context.put("maxTotalEvents", "35"); + context.put("batchSize", "10"); + source.configure(context); + + for (int i = 0; i < 50; i++) { + if (source.process() == Status.BACKOFF) { + TestCase.assertTrue("Source should have sent all events in 4 batches", i == 4); + break; + } + } + verify(mockProcessor, times(4)).processEventBatch(getEventList(source)); + TestCase.assertTrue("Number of successful events should be 35", getCounterGroup(source).get("events.successful") == 35); + TestCase.assertTrue("Number of failure events should be 0", getCounterGroup(source).get("events.failed") == 0); + } + + @Test public void testMaxSuccessfulEvents() throws InterruptedException, EventDeliveryException { StressSource source = new StressSource();