flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1299547 - in /incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src: main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Date Mon, 12 Mar 2012 03:14:33 GMT
Author: arvind
Date: Mon Mar 12 03:14:33 2012
New Revision: 1299547

URL: http://svn.apache.org/viewvc?rev=1299547&view=rev
Log:
FLUME-1009. HDFSEventSink should return BACKOFF when channel returns null.

(Brock Noland via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1299547&r1=1299546&r2=1299547&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Mon Mar 12 03:14:33 2012
@@ -321,10 +321,13 @@ public class HDFSEventSink extends Abstr
 
     try {
       transaction.begin();
+      Event event = null;
       for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
-        Event event = channel.take();
-        if (event == null)
+        event = null;
+        event = channel.take();
+        if (event == null) {
           break;
+        }
 
         // reconstruct the path name by substituting place holders
         String realPath = BucketPath.escapeString(path, event.getHeaders());
@@ -358,6 +361,9 @@ public class HDFSEventSink extends Abstr
       }
       batchMap.clear();
       transaction.commit();
+      if(event == null) {
+        return Status.BACKOFF;
+      }
       return Status.READY;
     } catch (IOException eIO) {
       transaction.rollback();

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1299547&r1=1299546&r2=1299547&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Mon Mar 12 03:14:33 2012
@@ -113,6 +113,21 @@ public class TestHDFSEventSink {
   }
 
   @Test
+  public void testEmptyChannelResultsInStatusBackoff()
+      throws InterruptedException, LifecycleException, EventDeliveryException {
+    Context context = new Context();
+    Channel channel = new MemoryChannel();
+    context.put("hdfs.path", testPath);
+    context.put("keep-alive", "0");
+    Configurables.configure(sink, context);
+    Configurables.configure(channel, context);
+    sink.setChannel(channel);
+    sink.start();
+    Assert.assertEquals(Status.BACKOFF, sink.process());
+    sink.stop();
+  }
+
+  @Test
   public void testTextAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {
 



Mime
View raw message