flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [26/50] [abbrv] git commit: FLUME-1490. Option to limit number of events sent in Stress source.
Date Fri, 07 Sep 2012 23:28:52 GMT
FLUME-1490. Option to limit number of events sent in Stress source.

(Patrick Wendell via Will McQueen)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b66521ac
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b66521ac
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b66521ac

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: b66521acc694bf3034779c10f3c1d1a8bd8603e8
Parents: 12fef16
Author: Will McQueen <will@cloudera.com>
Authored: Fri Aug 17 15:19:23 2012 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Fri Sep 7 14:03:05 2012 -0700

----------------------------------------------------------------------
 flume-ng-core/pom.xml                              |    7 +
 .../java/org/apache/flume/source/StressSource.java |   19 +++-
 .../flume/source/TestSequenceGeneratorSource.java  |    2 -
 .../org/apache/flume/source/TestStressSource.java  |  102 +++++++++++++++
 4 files changed, 127 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index f3b3240..a12e5b1 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -130,6 +130,13 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.easytesting</groupId>
+      <artifactId>fest-reflect</artifactId>
+      <version>1.4</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index 4f7b255..5b73910 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -32,6 +32,11 @@ import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Internal load-generating source implementation. Useful for tests.
+ *
+ * See {@link StressSource#configure(Context)} for configuration options.
+ */
 public class StressSource extends AbstractSource implements
   Configurable, PollableSource {
 
@@ -41,6 +46,8 @@ public class StressSource extends AbstractSource implements
   private CounterGroup counterGroup;
   private byte[] buffer;
   private Event event;
+  private long maxTotalEvents;
+  private long maxSuccessfulEvents;
 
   public StressSource() {
     counterGroup = new CounterGroup();
@@ -48,6 +55,11 @@ public class StressSource extends AbstractSource implements
   }
   @Override
   public void configure(Context context) {
+    /* Limit on the total number of events. */
+    maxTotalEvents = context.getLong("maxTotalEvents", -1L);
+    /* Limit on the total number of successful events. */
+    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);
+    /* Size of events to be generated. */
     int size = context.getInteger("size", 500);
     buffer = new byte[size];
     Arrays.fill(buffer, Byte.MAX_VALUE);
@@ -55,6 +67,12 @@ public class StressSource extends AbstractSource implements
   }
   @Override
   public Status process() throws EventDeliveryException {
+    if ((maxTotalEvents >= 0 &&
+        counterGroup.incrementAndGet("events.total") > maxTotalEvents) ||
+        (maxSuccessfulEvents >= 0 &&
+        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
+      return Status.BACKOFF;
+    }
     try {
       getChannelProcessor().processEvent(event);
       counterGroup.incrementAndGet("events.successful");
@@ -82,5 +100,4 @@ public class StressSource extends AbstractSource implements
 
     logger.info("Sequence generator source stopped. Metrics:{}", counterGroup);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
index 579b257..89dbeb2 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
@@ -103,8 +103,6 @@ public class TestSequenceGeneratorSource {
       Assert.assertArrayEquals(String.valueOf(i).getBytes(),
           new String(event.getBody()).getBytes());
     }
-
     source.stop();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
new file mode 100644
index 0000000..4ec16c7
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source;
+
+import static org.fest.reflect.core.Reflection.field;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.channel.ChannelProcessor;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStressSource {
+
+  private ChannelProcessor mockProcessor;
+
+  @Before
+  public void setUp() {
+    mockProcessor = mock(ChannelProcessor.class);
+  }
+
+  private Event getEvent(StressSource source) {
+    return field("event").ofType(Event.class)
+          .in(source)
+          .get();
+  }
+
+  @Test
+  public void testMaxTotalEvents() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("maxTotalEvents", "35");
+    source.configure(context);
+
+    for (int i = 0; i < 50; i++) {
+      source.process();
+    }
+    verify(mockProcessor, times(35)).processEvent(getEvent(source));
+  }
+
+  @Test
+  public void testMaxSuccessfulEvents() throws InterruptedException,
+      EventDeliveryException {
+    StressSource source = new StressSource();
+    source.setChannelProcessor(mockProcessor);
+    Context context = new Context();
+    context.put("maxSuccessfulEvents", "35");
+    source.configure(context);
+
+    for (int i = 0; i < 10; i++) {
+      source.process();
+    }
+
+    // 1 failed call, 10 successful
+    doThrow(new ChannelException("stub")).when(
+        mockProcessor).processEvent(getEvent(source));
+    source.process();
+    doNothing().when(mockProcessor).processEvent(getEvent(source));
+    for (int i = 0; i < 10; i++) {
+      source.process();
+    }
+
+    // 1 failed call, 50 succesful
+    doThrow(new ChannelException("stub")).when(
+        mockProcessor).processEvent(getEvent(source));
+    source.process();
+    doNothing().when(mockProcessor).processEvent(getEvent(source));
+    for (int i = 0; i < 50; i++) {
+      source.process();
+    }
+
+    // We should have called processEvent(evt) 37 times, twice for failures
+    // and twice for successful events.
+    verify(mockProcessor, times(37)).processEvent(getEvent(source));
+  }
+}


Mime
View raw message