flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1593. FileChannel race condition when log file rolls.
Date Sat, 22 Sep 2012 04:47:26 GMT
Updated Branches:
  refs/heads/trunk 41a19a54f -> 541b0c7e0


FLUME-1593. FileChannel race condition when log file rolls.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/541b0c7e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/541b0c7e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/541b0c7e

Branch: refs/heads/trunk
Commit: 541b0c7e0baeeb5e53b5b630a38b60c68ffa2d81
Parents: 41a19a5
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 21 21:42:52 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 21 21:42:52 2012 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Log.java    |   81 ++++++++++-----
 .../org/apache/flume/channel/file/LogFile.java     |   13 ++-
 .../channel/file/LogFileRetryableIOException.java  |   34 ++++++
 .../apache/flume/channel/file/TestFileChannel.java |    2 +-
 .../org/apache/flume/channel/file/TestLogFile.java |    9 +-
 5 files changed, 103 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/541b0c7e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index e36eafb..1072259 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -425,16 +425,23 @@ class Log {
     Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
     int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
     boolean error = true;
     try {
-      FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
-      error = false;
-      return ptr;
+      try {
+        FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+        error = false;
+        return ptr;
+      } catch (LogFileRetryableIOException e) {
+        if(!open) {
+          throw e;
+        }
+        roll(logFileIndex, buffer);
+        FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+        error = false;
+        return ptr;
+      }
     } finally {
-      if (error) {
+      if(error && open) {
         roll(logFileIndex);
       }
     }
@@ -455,15 +462,21 @@ class Log {
         pointer.getOffset(), pointer.getFileID());
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
     int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
     boolean error = true;
     try {
-      logFiles.get(logFileIndex).take(buffer);
-      error = false;
+      try {
+        logFiles.get(logFileIndex).take(buffer);
+        error = false;
+      } catch (LogFileRetryableIOException e) {
+        if(!open) {
+          throw e;
+        }
+        roll(logFileIndex, buffer);
+        logFiles.get(logFileIndex).take(buffer);
+        error = false;
+      }
     } finally {
-      if (error) {
+      if(error && open) {
         roll(logFileIndex);
       }
     }
@@ -485,15 +498,21 @@ class Log {
     Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next());
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
     int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
     boolean error = true;
     try {
-      logFiles.get(logFileIndex).rollback(buffer);
-      error = false;
+      try {
+        logFiles.get(logFileIndex).rollback(buffer);
+        error = false;
+      } catch (LogFileRetryableIOException e) {
+        if(!open) {
+          throw e;
+        }
+        roll(logFileIndex, buffer);
+        logFiles.get(logFileIndex).rollback(buffer);
+        error = false;
+      }
     } finally {
-      if (error) {
+      if(error && open) {
         roll(logFileIndex);
       }
     }
@@ -631,25 +650,31 @@ class Log {
    * @throws IOException
    */
   private void commit(long transactionID, short type) throws IOException {
-
     Preconditions.checkState(open, "Log is closed");
     Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
     int logFileIndex = nextLogWriter(transactionID);
-    if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
-      roll(logFileIndex, buffer);
-    }
     boolean error = true;
     try {
-      logFiles.get(logFileIndex).commit(buffer);
-      error = false;
+      try {
+        logFiles.get(logFileIndex).commit(buffer);
+        error = false;
+      } catch (LogFileRetryableIOException e) {
+        if(!open) {
+          throw e;
+        }
+        roll(logFileIndex, buffer);
+        logFiles.get(logFileIndex).commit(buffer);
+        error = false;
+      }
     } finally {
-      if (error) {
+      if(error && open) {
         roll(logFileIndex);
       }
     }
   }
 
+
   /**
    * Atomic so not synchronization required.
    * @return
@@ -660,6 +685,7 @@ class Log {
   /**
    * Unconditionally roll
    * Synchronization done internally
+   *
    * @param index
    * @throws IOException
    */
@@ -677,10 +703,11 @@ class Log {
    * methods call this method, and this method acquires only a
    * read lock. The synchronization guarantees that multiple threads don't
    * roll at the same time.
+   *
    * @param index
    * @throws IOException
    */
-  private synchronized void roll(int index, ByteBuffer buffer)
+    private synchronized void roll(int index, ByteBuffer buffer)
       throws IOException {
     if (!tryLockShared()) {
       throw new IOException("Failed to obtain lock for writing to the log. "

http://git-wip-us.apache.org/repos/asf/flume/blob/541b0c7e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 8071140..a2c790c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -187,10 +187,15 @@ abstract class LogFile {
       sync();
     }
     private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
-      Preconditions.checkState(isOpen(), "File closed");
+      if(!isOpen()) {
+        throw new LogFileRetryableIOException("File closed " + file);
+      }
       long length = position();
       long expectedLength = length + (long) buffer.limit();
-      Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
+      if(expectedLength > maxFileSize) {
+        throw new LogFileRetryableIOException(expectedLength + " > " +
+            maxFileSize);
+      }
       int offset = (int)length;
       Preconditions.checkState(offset >= 0, String.valueOf(offset));
       // OP_RECORD + size + buffer
@@ -208,7 +213,9 @@ abstract class LogFile {
       return isOpen() && position() + (long) buffer.limit() > getMaxSize();
     }
     private void sync() throws IOException {
-      Preconditions.checkState(isOpen(), "File closed");
+      if(!isOpen()) {
+        throw new LogFileRetryableIOException("File closed " + file);
+      }
       getFileChannel().force(false);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/541b0c7e/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
new file mode 100644
index 0000000..9447652
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.IOException;
+
+public class LogFileRetryableIOException extends IOException {
+  private static final long serialVersionUID = -2747112999806160431L;
+  public LogFileRetryableIOException() {
+    super();
+  }
+  public LogFileRetryableIOException(String msg) {
+    super(msg);
+  }
+  public LogFileRetryableIOException(String msg, Throwable t) {
+    super(msg, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/541b0c7e/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 8baf8fe..c12e7d2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -450,7 +450,7 @@ public class TestFileChannel extends TestFileChannelBase {
   public void testReferenceCounts() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
-    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20");
+    overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "100");
     final FileChannel channel = createFileChannel(overrides);
     channel.start();
     putEvents(channel, "testing-reference-counting", 1, 15);

http://git-wip-us.apache.org/repos/asf/flume/blob/541b0c7e/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 87d9c3f..9fc834e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -52,8 +52,8 @@ public class TestLogFile {
     dataDir = Files.createTempDir();
     dataFile = new File(dataDir, String.valueOf(fileID));
     Assert.assertTrue(dataDir.isDirectory());
-    logFileWriter = LogFileFactory.getWriter(dataFile, fileID, 1000, null, null,
-        null);
+    logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
+        Integer.MAX_VALUE, null, null, null);
   }
   @After
   public void cleanup() throws IOException {
@@ -69,7 +69,7 @@ public class TestLogFile {
   public void testWriterRefusesToOverwriteFile() throws IOException {
     Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
     try {
-      LogFileFactory.getWriter(dataFile, fileID, 1000, null, null,
+      LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
           null);
       Assert.fail();
     } catch (IllegalStateException e) {
@@ -83,7 +83,7 @@ public class TestLogFile {
     Assert.assertFalse(dataFile.exists());
     Assert.assertTrue(dataFile.mkdirs());
     try {
-      LogFileFactory.getWriter(dataFile, fileID, 1000, null, null,
+      LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
           null);
       Assert.fail();
     } catch (IllegalStateException e) {
@@ -138,7 +138,6 @@ public class TestLogFile {
     for(Throwable throwable : errors) {
       Throwables.propagate(throwable);
     }
-    Assert.assertTrue(logFileWriter.isRollRequired(ByteBuffer.allocate(0)));
   }
   @Test
   public void testReader() throws InterruptedException, IOException {


Mime
View raw message