flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnyru...@apache.org
Subject flume git commit: FLUME-2729. Allow pollableSource backoff times to be configurable
Date Tue, 07 Jul 2015 00:32:52 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk d6943a662 -> b5e102bee


FLUME-2729. Allow pollableSource backoff times to be configurable

(Ted Malaska via Johny Rufus)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b5e102be
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b5e102be
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b5e102be

Branch: refs/heads/trunk
Commit: b5e102bee4d4b2783f3f65aab403a53c1ae8e401
Parents: d6943a6
Author: Johny Rufus <johnyrufus@apache.org>
Authored: Mon Jul 6 16:19:41 2015 -0700
Committer: Johny Rufus <johnyrufus@apache.org>
Committed: Mon Jul 6 16:19:41 2015 -0700

----------------------------------------------------------------------
 .../flume/channel/file/TestIntegration.java     |  3 +
 .../java/org/apache/flume/PollableSource.java   |  4 ++
 .../flume/source/AbstractPollableSource.java    | 24 ++++++-
 .../flume/source/PollableSourceConstants.java   | 28 ++++++++
 .../flume/source/PollableSourceRunner.java      |  8 +--
 .../flume/source/SequenceGeneratorSource.java   | 30 ++++-----
 .../org/apache/flume/source/StressSource.java   | 32 ++++-----
 .../source/TestAbstractPollableSource.java      | 68 ++++++++++++++++++++
 .../flume/source/TestPollableSourceRunner.java  | 10 +++
 .../source/TestSequenceGeneratorSource.java     |  1 +
 .../apache/flume/source/TestStressSource.java   |  4 ++
 .../apache/flume/source/kafka/KafkaSource.java  | 25 +++----
 12 files changed, 182 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
index 4e2f940..2fbe116 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
@@ -94,6 +94,9 @@ public class TestIntegration {
     SequenceGeneratorSource source = new SequenceGeneratorSource();
     CountingSourceRunner sourceRunner = new CountingSourceRunner(source, channel);
 
+    source.configure(context);
+    source.start();
+
     NullSink sink = new NullSink();
     sink.setChannel(channel);
     CountingSinkRunner sinkRunner = new CountingSinkRunner(sink);

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
index e872b0c..764810b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
@@ -47,6 +47,10 @@ public interface PollableSource extends Source {
    */
   public Status process() throws EventDeliveryException;
 
+  public long getBackOffSleepIncrement();
+
+  public long getMaxBackOffSleepInterval();
+
   public static enum Status {
     READY, BACKOFF
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
index 356f4d4..33e1acc 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.source;
 
+import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource;
@@ -39,6 +40,9 @@ import org.apache.flume.annotations.InterfaceStability;
 public abstract class AbstractPollableSource extends BasicSourceSemantics
   implements PollableSource {
 
+  long backoffSleepIncrement = PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT;
+  long maxBackoffSleep = PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP;
+
   public AbstractPollableSource() {
     super();
   }
@@ -49,10 +53,28 @@ public abstract class AbstractPollableSource extends BasicSourceSemantics
           exception);
     }
     if(!isStarted()) {
-      throw new EventDeliveryException("Source is not started");
+      throw new EventDeliveryException("Source is not started.  It is in '" + getLifecycleState()
+ "' state");
     }
     return doProcess();
   }
 
+  @Override
+  public synchronized void configure(Context context) {
+    super.configure(context);
+    backoffSleepIncrement =
+            context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
+                    PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
+    maxBackoffSleep = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
+            PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
+  }
+
+  public long getBackOffSleepIncrement() {
+    return backoffSleepIncrement;
+  }
+
+  public long getMaxBackOffSleepInterval() {
+    return maxBackoffSleep;
+  }
+
   protected abstract Status doProcess() throws EventDeliveryException;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java
new file mode 100644
index 0000000..f13207d
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceConstants.java
@@ -0,0 +1,28 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.flume.source;
+
+public class PollableSourceConstants {
+
+  public static final String BACKOFF_SLEEP_INCREMENT = "backoffSleepIncrement";
+  public static final String MAX_BACKOFF_SLEEP = "maxBackoffSleep";
+  public static final long DEFAULT_BACKOFF_SLEEP_INCREMENT = 1000;
+  public static final long DEFAULT_MAX_BACKOFF_SLEEP = 5000;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
index f6c64b3..ea37703 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
@@ -50,8 +50,6 @@ public class PollableSourceRunner extends SourceRunner {
 
   private static final Logger logger = LoggerFactory
       .getLogger(PollableSourceRunner.class);
-  private static final long backoffSleepIncrement = 1000;
-  private static final long maxBackoffSleep = 5000;
 
   private AtomicBoolean shouldStop;
 
@@ -141,7 +139,7 @@ public class PollableSourceRunner extends SourceRunner {
 
             Thread.sleep(Math.min(
                 counterGroup.incrementAndGet("runner.backoffs.consecutive")
-                * backoffSleepIncrement, maxBackoffSleep));
+                * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
           } else {
             counterGroup.set("runner.backoffs.consecutive", 0L);
           }
@@ -154,9 +152,9 @@ public class PollableSourceRunner extends SourceRunner {
         } catch (Exception e) {
           counterGroup.incrementAndGet("runner.errors");
           logger.error("Unhandled exception, logging and sleeping for " +
-              maxBackoffSleep + "ms", e);
+              source.getMaxBackOffSleepInterval() + "ms", e);
           try {
-            Thread.sleep(maxBackoffSleep);
+            Thread.sleep(source.getMaxBackOffSleepInterval());
           } catch (InterruptedException ex) {
             Thread.currentThread().interrupt();
           }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 51e021a..1214635 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -25,15 +25,15 @@ import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SequenceGeneratorSource extends AbstractSource implements
-    PollableSource, Configurable {
+public class SequenceGeneratorSource extends AbstractPollableSource implements
+        Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(SequenceGeneratorSource.class);
@@ -54,7 +54,7 @@ public class SequenceGeneratorSource extends AbstractSource implements
    * <li>batchSize = type int that defines the size of event batches
    */
   @Override
-  public void configure(Context context) {
+  protected void doConfigure(Context context) throws FlumeException {
     batchSize = context.getInteger("batchSize", 1);
     if (batchSize > 1) {
       batchArrayList = new ArrayList<Event>(batchSize);
@@ -66,15 +66,14 @@ public class SequenceGeneratorSource extends AbstractSource implements
   }
 
   @Override
-  public Status process() throws EventDeliveryException {
-
+  protected Status doProcess() throws EventDeliveryException {
     Status status = Status.READY;
     int i = 0;
     try {
       if (batchSize <= 1) {
         if(eventsSent < totalEvents) {
           getChannelProcessor().processEvent(
-            EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+                  EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
           sourceCounter.incrementEventAcceptedCount();
           eventsSent++;
         } else {
@@ -85,7 +84,7 @@ public class SequenceGeneratorSource extends AbstractSource implements
         for (i = 0; i < batchSize; i++) {
           if(eventsSent < totalEvents){
             batchArrayList.add(i, EventBuilder.withBody(String
-              .valueOf(sequence++).getBytes()));
+                    .valueOf(sequence++).getBytes()));
             eventsSent++;
           } else {
             status = Status.BACKOFF;
@@ -107,22 +106,19 @@ public class SequenceGeneratorSource extends AbstractSource implements
   }
 
   @Override
-  public void start() {
-    logger.info("Sequence generator source starting");
-
-    super.start();
+  protected void doStart() throws FlumeException {
+    logger.info("Sequence generator source do starting");
     sourceCounter.start();
-    logger.debug("Sequence generator source started");
+    logger.debug("Sequence generator source do started");
   }
 
   @Override
-  public void stop() {
-    logger.info("Sequence generator source stopping");
+  protected void doStop() throws FlumeException {
+    logger.info("Sequence generator source do stopping");
 
-    super.stop();
     sourceCounter.stop();
 
-    logger.info("Sequence generator source stopped. Metrics:{}",getName(), sourceCounter);
+    logger.info("Sequence generator source do stopped. Metrics:{}",getName(), sourceCounter);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index 0e7020b..9aa1477 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -28,7 +28,7 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.PollableSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
@@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory;
  *
  * See {@link StressSource#configure(Context)} for configuration options.
  */
-public class StressSource extends AbstractSource implements
-  Configurable, PollableSource {
+public class StressSource extends AbstractPollableSource implements
+  Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(StressSource.class);
@@ -81,7 +81,7 @@ public class StressSource extends AbstractSource implements
    * <li>-batchSize = type int that defines the number of Events being sent in one
batch
    */
   @Override
-  public void configure(Context context) {
+  protected void doConfigure(Context context) throws FlumeException {
     /* Limit on the total number of events. */
     maxTotalEvents = context.getLong("maxTotalEvents", -1L);
     /* Limit on the total number of successful events. */
@@ -113,13 +113,13 @@ public class StressSource extends AbstractSource implements
   }
 
   @Override
-  public Status process() throws EventDeliveryException {
+  protected Status doProcess() throws EventDeliveryException {
     long totalEventSent = counterGroup.addAndGet("events.total", lastSent);
 
     if ((maxTotalEvents >= 0 &&
-        totalEventSent >= maxTotalEvents) ||
-        (maxSuccessfulEvents >= 0 &&
-        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
+            totalEventSent >= maxTotalEvents) ||
+            (maxSuccessfulEvents >= 0 &&
+                    counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
       return Status.BACKOFF;
     }
     try {
@@ -148,20 +148,12 @@ public class StressSource extends AbstractSource implements
   }
 
   @Override
-  public void start() {
-    logger.info("Stress source starting");
-
-    super.start();
-
-    logger.debug("Stress source started");
+  protected void doStart() throws FlumeException {
+    logger.info("Stress source doStart finished");
   }
 
   @Override
-  public void stop() {
-    logger.info("Stress source stopping");
-
-    super.stop();
-
-    logger.info("Stress source stopped. Metrics:{}", counterGroup);
+  protected void doStop() throws FlumeException {
+    logger.info("Stress source do stop. Metrics:{}", counterGroup);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
index 02a2f0c..d385abe 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
@@ -23,8 +23,10 @@ import static org.mockito.Mockito.*;
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import java.util.HashMap;
 
 public class TestAbstractPollableSource {
 
@@ -61,4 +63,70 @@ public class TestAbstractPollableSource {
     source.process();
   }
 
+  @Test
+  public void voidBackOffConfig() {
+    source = spy(new AbstractPollableSource() {
+      @Override
+      protected Status doProcess() throws EventDeliveryException {
+        return Status.BACKOFF;
+      }
+      @Override
+      protected void doConfigure(Context context) throws FlumeException {
+      }
+      @Override
+      protected void doStart() throws FlumeException {
+
+      }
+      @Override
+      protected void doStop() throws FlumeException {
+
+      }
+    });
+
+    HashMap<String, String> inputConfigs = new HashMap<String,String>();
+    inputConfigs.put(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT, "42");
+    inputConfigs.put(PollableSourceConstants.MAX_BACKOFF_SLEEP, "4242");
+
+    Context context = new Context(inputConfigs);
+
+    source.configure(context);
+    Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getBackOffSleepIncrement(),
+            42l, source.getBackOffSleepIncrement());
+    Assert.assertEquals("BackOffSleepIncrement should equal 42 but it equals " + source.getMaxBackOffSleepInterval(),
+            4242l, source.getMaxBackOffSleepInterval());
+  }
+
+  @Test
+  public void voidBackOffConfigDefaults() {
+    source = spy(new AbstractPollableSource() {
+      @Override
+      protected Status doProcess() throws EventDeliveryException {
+        return Status.BACKOFF;
+      }
+      @Override
+      protected void doConfigure(Context context) throws FlumeException {
+      }
+      @Override
+      protected void doStart() throws FlumeException {
+
+      }
+      @Override
+      protected void doStop() throws FlumeException {
+
+      }
+    });
+
+    HashMap<String, String> inputConfigs = new HashMap<String,String>();
+
+    Assert.assertEquals("BackOffSleepIncrement should equal " +
+                    PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT +
+                    " but it equals " + source.getBackOffSleepIncrement(),
+            PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT, source.getBackOffSleepIncrement());
+
+    Assert.assertEquals("BackOffSleepIncrement should equal " +
+                    PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP +
+                    " but it equals " + source.getMaxBackOffSleepInterval(),
+            PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP, source.getMaxBackOffSleepInterval());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
index 4d4222d..d706e9b 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
@@ -95,6 +95,16 @@ public class TestPollableSourceRunner {
       }
 
       @Override
+      public long getBackOffSleepIncrement() {
+        return PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT;
+      }
+
+      @Override
+      public long getMaxBackOffSleepInterval() {
+        return PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP;
+      }
+
+      @Override
       public void start() {
         // Unused.
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
index c9d3e20..2bbcdaf 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
@@ -64,6 +64,7 @@ public class TestSequenceGeneratorSource {
     rcs.setChannels(channels);
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
+    source.start();
 
     for (long i = 0; i < 100; i++) {
       source.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
index 28270f4..a651281 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
@@ -75,6 +75,7 @@ public class TestStressSource {
     Context context = new Context();
     context.put("maxTotalEvents", "35");
     source.configure(context);
+    source.start();
 
     for (int i = 0; i < 50; i++) {
       source.process();
@@ -91,6 +92,7 @@ public class TestStressSource {
     context.put("maxTotalEvents", "35");
     context.put("batchSize", "10");
     source.configure(context);
+    source.start();
 
     for (int i = 0; i < 50; i++) {
       if (source.process() == Status.BACKOFF) {
@@ -121,6 +123,7 @@ public class TestStressSource {
     Context context = new Context();
     context.put("batchSize", "10");
     source.configure(context);
+    source.start();
 
     for (int i = 0; i < 10; i++) {
       Assert.assertFalse("StressSource with no maxTotalEvents should not return " +
@@ -146,6 +149,7 @@ public class TestStressSource {
     Context context = new Context();
     context.put("maxSuccessfulEvents", "35");
     source.configure(context);
+    source.start();
 
     for (int i = 0; i < 10; i++) {
       source.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e102be/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 3777639..fd1dd3c 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -26,15 +26,17 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.ConsumerTimeoutException;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
-
 import kafka.message.MessageAndMetadata;
+
 import org.apache.flume.*;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
+import org.apache.flume.source.AbstractPollableSource;
 import org.apache.flume.source.AbstractSource;
+import org.apache.flume.source.BasicSourceSemantics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,8 +68,8 @@ import org.slf4j.LoggerFactory;
  * Any property starting with "kafka" will be passed to the kafka consumer So
  * you can use any configuration supported by Kafka 0.8.1.1
  */
-public class KafkaSource extends AbstractSource
-        implements Configurable, PollableSource {
+public class KafkaSource extends AbstractPollableSource
+        implements Configurable {
   private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
   private ConsumerConnector consumer;
   private ConsumerIterator<byte[],byte[]> it;
@@ -81,8 +83,8 @@ public class KafkaSource extends AbstractSource
   private final List<Event> eventList = new ArrayList<Event>();
   private KafkaSourceCounter counter;
 
-  public Status process() throws EventDeliveryException {
-
+  @Override
+  protected Status doProcess() throws EventDeliveryException {
     byte[] kafkaMessage;
     byte[] kafkaKey;
     Event event;
@@ -168,7 +170,8 @@ public class KafkaSource extends AbstractSource
    *
    * @param context
    */
-  public void configure(Context context) {
+  @Override
+  protected void doConfigure(Context context) throws FlumeException {
     this.context = context;
     batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
             KafkaSourceConstants.DEFAULT_BATCH_SIZE);
@@ -192,7 +195,7 @@ public class KafkaSource extends AbstractSource
   }
 
   @Override
-  public synchronized void start() {
+  protected void doStart() throws FlumeException {
     log.info("Starting {}...", this);
 
     try {
@@ -221,21 +224,19 @@ public class KafkaSource extends AbstractSource
     } catch (Exception e) {
       throw new FlumeException("Unable to get message iterator from Kafka", e);
     }
-    log.info("Kafka source {} started.", getName());
+    log.info("Kafka source {} do started.", getName());
     counter.start();
-    super.start();
   }
 
   @Override
-  public synchronized void stop() {
+  protected void doStop() throws FlumeException {
     if (consumer != null) {
       // exit cleanly. This syncs offsets of messages read to ZooKeeper
       // to avoid reading the same messages again
       consumer.shutdown();
     }
     counter.stop();
-    log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
-    super.stop();
+    log.info("Kafka Source {} do stopped. Metrics: {}", getName(), counter);
   }
 
   /**


Mime
View raw message