flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1173447 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-core/src/test/java/org/apache/...
Date Wed, 21 Sep 2011 00:57:43 GMT
Author: esammer
Date: Wed Sep 21 00:57:43 2011
New Revision: 1173447

URL: http://svn.apache.org/viewvc?rev=1173447&view=rev
Log:
- Sources and sinks now hint as to their willingness to take more data by returning a status.
- Polling source / sink runners understand and respect a desire to backoff.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSink.java
Wed Sep 21 00:57:43 2011
@@ -2,6 +2,10 @@ package org.apache.flume;
 
 public interface PollableSink extends Sink {
 
-  public void process() throws InterruptedException, EventDeliveryException;
+  public Status process() throws EventDeliveryException;
+
+  public static enum Status {
+    READY, BACKOFF
+  }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
Wed Sep 21 00:57:43 2011
@@ -2,10 +2,10 @@ package org.apache.flume;
 
 public interface PollableSource extends Source {
 
-  /*
-   * FIXME: Arvind removed InterruptedException from the interface in his
-   * branch.
-   */
-  public void process() throws InterruptedException, EventDeliveryException;
+  public Status process() throws EventDeliveryException;
+
+  public static enum Status {
+    READY, BACKOFF
+  }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
Wed Sep 21 00:57:43 2011
@@ -31,7 +31,7 @@ public class LoggerSink extends Abstract
       .getLogger(LoggerSink.class);
 
   @Override
-  public void process() throws EventDeliveryException {
+  public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
     Event event = null;
@@ -39,8 +39,15 @@ public class LoggerSink extends Abstract
     try {
       transaction.begin();
       event = channel.take();
-      logger.info("Event: " + event);
-      transaction.commit();
+
+      if (event != null) {
+        logger.info("Event: " + event);
+        transaction.commit();
+        return Status.READY;
+      } else {
+        transaction.rollback();
+        return Status.BACKOFF;
+      }
     } catch (Exception ex) {
       transaction.rollback();
       throw new EventDeliveryException("Failed to log event: " + event, ex);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
Wed Sep 21 00:57:43 2011
@@ -37,7 +37,7 @@ public class NullSink extends AbstractSi
   }
 
   @Override
-  public void process() throws EventDeliveryException {
+  public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
     Event event = null;
@@ -45,8 +45,15 @@ public class NullSink extends AbstractSi
     try {
       transaction.begin();
       event = channel.take();
-      //logger.debug("Consumed the event: " + event);
       transaction.commit();
+
+      if (event != null) {
+        // logger.debug("Consumed the event: " + event);
+        counterGroup.incrementAndGet("events.successful");
+        return Status.READY;
+      } else {
+        return Status.BACKOFF;
+      }
     } catch (Exception ex) {
       transaction.rollback();
       counterGroup.incrementAndGet("events.failed");
@@ -55,7 +62,6 @@ public class NullSink extends AbstractSi
     } finally {
       transaction.close();
     }
-    counterGroup.incrementAndGet("events.successful");
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/PollableSinkRunner.java
Wed Sep 21 00:57:43 2011
@@ -91,7 +91,11 @@ public class PollableSinkRunner extends 
 
       while (!shouldStop.get()) {
         try {
-          sink.process();
+          if (sink.process().equals(PollableSink.Status.BACKOFF)) {
+            counterGroup.incrementAndGet("runner.backoffs");
+            /* Should this be configurable? */
+            Thread.sleep(500);
+          }
         } catch (InterruptedException e) {
           logger.debug("Interrupted while processing an event. Exiting.");
           counterGroup.incrementAndGet("runner.interruptions");
@@ -101,7 +105,7 @@ public class PollableSinkRunner extends 
         }
       }
 
-      logger.debug("Polling sink runner exiting");
+      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
     }
 
   }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
Wed Sep 21 00:57:43 2011
@@ -116,7 +116,7 @@ public class RollingFileSink extends Abs
   }
 
   @Override
-  public void process() throws EventDeliveryException {
+  public Status process() throws EventDeliveryException {
     if (shouldRotate) {
       logger.debug("Time to rotate {}", pathController.getCurrentFile());
 
@@ -158,23 +158,31 @@ public class RollingFileSink extends Abs
       transaction.begin();
       event = channel.take();
 
-      byte[] bytes = formatter.format(event);
+      if (event != null) {
+        byte[] bytes = formatter.format(event);
 
-      outputStream.write(bytes);
+        outputStream.write(bytes);
 
-      /*
-       * FIXME: Feature: Rotate on size and time by checking bytes written and
-       * setting shouldRotate = true if we're past a threshold.
-       */
-      counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length);
-
-      /*
-       * FIXME: Feature: Control flush interval based on time or number of
-       * events. For now, we're super-conservative and flush on each write.
-       */
-      outputStream.flush();
+        /*
+         * FIXME: Feature: Rotate on size and time by checking bytes written and
+         * setting shouldRotate = true if we're past a threshold.
+         */
+        counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length);
+
+        /*
+         * FIXME: Feature: Control flush interval based on time or number of
+         * events. For now, we're super-conservative and flush on each write.
+         */
+        outputStream.flush();
+
+        transaction.commit();
 
-      transaction.commit();
+        return Status.READY;
+      } else {
+        transaction.rollback();
+
+        return Status.BACKOFF;
+      }
     } catch (Exception ex) {
       transaction.rollback();
       throw new EventDeliveryException("Failed to process event: " + event, ex);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
Wed Sep 21 00:57:43 2011
@@ -87,12 +87,19 @@ public class PollableSourceRunner extend
       logger.debug("Polling runner starting. Source:{}", source);
 
       while (!shouldStop.get()) {
+        counterGroup.incrementAndGet("runner.polls");
+
         try {
-          source.process();
-          counterGroup.incrementAndGet("events.successful");
+          if (source.process().equals(PollableSource.Status.BACKOFF)) {
+            counterGroup.incrementAndGet("runner.backoffs");
+            Thread.sleep(500);
+          }
+
+        } catch (InterruptedException e) {
+          logger.info("Source runner interrupted. Exiting");
         } catch (Exception e) {
           logger.error("Unable to process event. Exception follows.", e);
-          counterGroup.incrementAndGet("events.failed");
+          counterGroup.incrementAndGet("runner.failures");
         }
       }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
Wed Sep 21 00:57:43 2011
@@ -16,7 +16,7 @@ public class SequenceGeneratorSource ext
   }
 
   @Override
-  public void process() throws InterruptedException, EventDeliveryException {
+  public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
 
@@ -26,8 +26,11 @@ public class SequenceGeneratorSource ext
       transaction.commit();
     } catch (Exception e) {
       transaction.rollback();
+    } finally {
+      transaction.close();
     }
-    /* FIXME: Add finally { transaction.close() } */
+
+    return Status.READY;
   }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestPollableSourceRunner.java
Wed Sep 21 00:57:43 2011
@@ -3,17 +3,25 @@ package org.apache.flume.source;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.flume.Channel;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestPollableSourceRunner {
 
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestPollableSourceRunner.class);
+
   private PollableSourceRunner sourceRunner;
 
   @Before
@@ -26,6 +34,8 @@ public class TestPollableSourceRunner {
     final Channel channel = new MemoryChannel();
     final CountDownLatch latch = new CountDownLatch(50);
 
+    Configurables.configure(channel, new Context());
+
     PollableSource source = new PollableSource() {
 
       @Override
@@ -40,17 +50,29 @@ public class TestPollableSourceRunner {
       }
 
       @Override
-      public void process() throws InterruptedException, EventDeliveryException {
-        Event event = EventBuilder.withBody(String.valueOf(
-            "Event " + latch.getCount()).getBytes());
+      public Status process() throws EventDeliveryException {
+        Transaction transaction = channel.getTransaction();
 
-        latch.countDown();
-
-        if (latch.getCount() % 20 == 0) {
-          throw new EventDeliveryException("I don't like event:" + event);
+        try {
+          transaction.begin();
+          Event event = EventBuilder.withBody(String.valueOf(
+              "Event " + latch.getCount()).getBytes());
+
+          latch.countDown();
+
+          if (latch.getCount() % 20 == 0) {
+            throw new EventDeliveryException("I don't like event:" + event);
+          }
+          channel.put(event);
+          transaction.commit();
+          return Status.READY;
+        } catch (EventDeliveryException e) {
+          logger.error("Unable to deliver event. Exception follows.", e);
+          transaction.rollback();
+          return Status.BACKOFF;
+        } finally {
+          transaction.close();
         }
-
-        channel.put(event);
       }
 
       @Override

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java?rev=1173447&r1=1173446&r2=1173447&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
Wed Sep 21 00:57:43 2011
@@ -5,13 +5,18 @@ import org.apache.flume.EventDeliveryExc
 public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource {
 
   @Override
-  public void process() throws EventDeliveryException, InterruptedException {
+  public Status process() throws EventDeliveryException {
 
     if (Math.round(Math.random()) == 1) {
-      Thread.sleep(1000);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Do nothing.
+      }
+
       throw new EventDeliveryException("I'm broken!");
     } else {
-      super.process();
+      return super.process();
     }
   }
 



Mime
View raw message