flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1610: HDDSEventSink and bucket writer have a race condition
Date Tue, 25 Sep 2012 22:19:13 GMT
Updated Branches:
  refs/heads/trunk 79ff3adf0 -> d11370178


FLUME-1610: HDDSEventSink and bucket writer have a race condition

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d1137017
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d1137017
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d1137017

Branch: refs/heads/trunk
Commit: d1137017866f9c666f47b55452531e01a27fbdd8
Parents: 79ff3ad
Author: Brock Noland <brock@apache.org>
Authored: Tue Sep 25 17:17:50 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Tue Sep 25 17:17:50 2012 -0500

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   19 +++--
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |    4 +-
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   59 +++++++++++++++
 3 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d1137017/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 6408eb9..bce8e11 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -236,6 +236,7 @@ class BucketWriter {
    * @throws IOException On failure to rename if temp file exists.
    */
   public synchronized void close() throws IOException, InterruptedException {
+    flush();
     runPrivileged(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -281,13 +282,15 @@ class BucketWriter {
    * flush the data
    */
   public synchronized void flush() throws IOException, InterruptedException {
-    runPrivileged(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        doFlush();
-        return null;
-      }
-    });
+    if (!isBatchComplete()) {
+      runPrivileged(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doFlush();
+          return null;
+        }
+      });
+    }
   }
 
   /**
@@ -384,7 +387,7 @@ class BucketWriter {
         ", bucketPath = " + bucketPath + " ]";
   }
 
-  public boolean isBatchComplete() {
+  private boolean isBatchComplete() {
     return (batchCounter == 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d1137017/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 9a76ecb..5ec9eb8 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -426,9 +426,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
       // flush all pending buckets before committing the transaction
       for (BucketWriter bucketWriter : writers) {
-        if (!bucketWriter.isBatchComplete()) {
-          flush(bucketWriter);
-        }
+        flush(bucketWriter);
       }
 
       transaction.commit();

http://git-wip-us.apache.org/repos/asf/flume/blob/d1137017/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index bb12188..60f1830 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -27,7 +27,10 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -36,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
+import java.io.File;
 
 public class TestBucketWriter {
 
@@ -155,4 +159,59 @@ public class TestBucketWriter {
     Assert.assertEquals("files closed", 2, hdfsWriter.getFilesClosed());
   }
 
+  @Test
+  public void testIntervalRollerBug() throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1; // seconds
+    final int NUM_EVENTS = 10;
+
+    HDFSWriter hdfsWriter = new HDFSWriter() {
+      private volatile boolean open = false;
+      @Override
+      public void configure(Context context) {
+
+      }
+      @Override
+      public void sync() throws IOException {
+        if(!open) {
+          throw new IOException("closed");
+        }
+      }
+      @Override
+      public void open(String filePath, CompressionCodec codec,
+          CompressionType cType, FlumeFormatter fmt) throws IOException {
+        open = true;
+      }
+      @Override
+      public void open(String filePath, FlumeFormatter fmt) throws IOException {
+        open = true;
+      }
+      @Override
+      public void close() throws IOException {
+        open = false;
+      }
+      @Override
+      public void append(Event e, FlumeFormatter fmt) throws IOException {
+        // we just re-open in append if closed
+        open = true;
+      }
+    };
+    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    File tmpFile = File.createTempFile("flume", "test");
+    tmpFile.deleteOnExit();
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+        tmpFile.getName(), null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    for (int i = 0; i < NUM_EVENTS - 1; i++) {
+      bucketWriter.append(e);
+    }
+
+    // sleep to force a roll... wait 2x interval just to be sure
+    Thread.sleep(2 * ROLL_INTERVAL * 1000L);
+
+    bucketWriter.flush(); // throws closed exception
+  }
+
 }


Mime
View raw message