flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1763. FileChannel checkpoints should not be done without free space
Date Tue, 11 Dec 2012 01:14:58 GMT
Updated Branches:
  refs/heads/trunk e45d2fa33 -> 806a7b6ac


FLUME-1763. FileChannel checkpoints should not be done without free space

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/806a7b6a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/806a7b6a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/806a7b6a

Branch: refs/heads/trunk
Commit: 806a7b6ace9398de2645f073acc72de19913bfab
Parents: e45d2fa
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Dec 10 17:13:51 2012 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Dec 10 17:13:51 2012 -0800

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |    7 ++
 .../channel/file/FileChannelConfiguration.java     |   11 +++
 .../java/org/apache/flume/channel/file/Log.java    |   32 +++++---
 .../org/apache/flume/channel/file/TestLog.java     |   68 ++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    1 +
 5 files changed, 107 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/806a7b6a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 64a1350..950ea8c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -79,6 +79,7 @@ public class FileChannel extends BasicChannelSemantics {
   private int transactionCapacity;
   private long checkpointInterval;
   private long maxFileSize;
+  private long minimumRequiredSpace;
   private File checkpointDir;
   private File[] dataDirs;
   private Log log;
@@ -174,6 +175,11 @@ public class FileChannel extends BasicChannelSemantics {
             FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
             FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
 
+    minimumRequiredSpace = Math.max(
+        context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE,
+            FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE),
+            FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE);
+
     logWriteTimeout = context.getInteger(
         FileChannelConfiguration.LOG_WRITE_TIMEOUT,
         FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT);
@@ -258,6 +264,7 @@ public class FileChannel extends BasicChannelSemantics {
       Builder builder = new Log.Builder();
       builder.setCheckpointInterval(checkpointInterval);
       builder.setMaxFileSize(maxFileSize);
+      builder.setMinimumRequiredSpace(minimumRequiredSpace);
       builder.setQueueSize(capacity);
       builder.setLogWriteTimeout(logWriteTimeout);
       builder.setCheckpointDir(checkpointDir);

http://git-wip-us.apache.org/repos/asf/flume/blob/806a7b6a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 92cad77..24368b3 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -45,6 +45,17 @@ public class FileChannelConfiguration {
   public static final String MAX_FILE_SIZE = "maxFileSize";
   public static final long DEFAULT_MAX_FILE_SIZE =
         Integer.MAX_VALUE - (500L * 1024L * 1024L); // ~1.52 G
+
+  public static final String MINIMUM_REQUIRED_SPACE = "minimumRequiredSpace";
+  /**
+   * Minimum space required defaults to 500MB
+   */
+  public static final long DEFAULT_MINIMUM_REQUIRED_SPACE = 500L * 1024L * 1024L;
+  /**
+   * Minimum space floor is 1MB
+   */
+  public static final long FLOOR_MINIMUM_REQUIRED_SPACE = 1L * 1024L * 1024L;
+
   /**
    * Maximum capacity of the channel.
    * Default: 1,000,000

http://git-wip-us.apache.org/repos/asf/flume/blob/806a7b6a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 7906d30..829e35a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -76,10 +76,6 @@ class Log {
   private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
   private static final int MIN_NUM_LOGS = 2;
   private static final String FILE_LOCK = "in_use.lock";
-  /**
-   * Each file system in use must have at least 10MB of space.
-   */
-  private static final long ABSOLUTE_MINIMUM_REQURED_SPACE = 10L * 1024L * 1024L;
   // for reader
   private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
       .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
@@ -96,6 +92,7 @@ class Log {
   private long checkpointInterval;
   private long maxFileSize;
   private final boolean useFastReplay;
+  private final long minimumRequiredSpace;
   private final Map<String, FileLock> locks;
   private final ReentrantReadWriteLock checkpointLock =
       new ReentrantReadWriteLock(true);
@@ -118,6 +115,7 @@ class Log {
 
   static class Builder {
     private long bCheckpointInterval;
+    private long bMinimumRequiredSpace;
     private long bMaxFileSize;
     private int bQueueCapacity;
     private File bCheckpointDir;
@@ -168,6 +166,11 @@ class Log {
       return this;
     }
 
+    Builder setMinimumRequiredSpace(long minimumRequiredSpace) {
+      bMinimumRequiredSpace = minimumRequiredSpace;
+      return this;
+    }
+
     Builder setCheckpointWriteTimeout(int checkpointTimeout){
       bCheckpointWriteTimeout = checkpointTimeout;
       return this;
@@ -201,15 +204,16 @@ class Log {
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
           bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
-          useLogReplayV1, useFastReplay, bEncryptionKeyProvider,
-          bEncryptionKeyAlias, bEncryptionCipherProvider, bLogDirs);
+          useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
+          bEncryptionKeyProvider, bEncryptionKeyAlias,
+          bEncryptionCipherProvider, bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
       int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
       String name, boolean useLogReplayV1, boolean useFastReplay,
-      @Nullable KeyProvider encryptionKeyProvider,
+      long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
       @Nullable String encryptionKeyAlias,
       @Nullable String encryptionCipherProvider, File... logDirs)
           throws IOException {
@@ -229,6 +233,7 @@ class Log {
     this.channelNameDescriptor = "[channel=" + name + "]";
     this.useLogReplayV1 = useLogReplayV1;
     this.useFastReplay = useFastReplay;
+    this.minimumRequiredSpace = minimumRequiredSpace;
     for (File logDir : logDirs) {
       Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
           "LogDir " + logDir + " could not be created");
@@ -467,7 +472,7 @@ class Log {
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
-    long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit();
+    long requiredSpace = minimumRequiredSpace + buffer.limit();
     if(usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhaused, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
@@ -510,7 +515,7 @@ class Log {
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
-    long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit();
+    long requiredSpace = minimumRequiredSpace + buffer.limit();
     if(usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhaused, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
@@ -552,7 +557,7 @@ class Log {
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
-    long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit();
+    long requiredSpace = minimumRequiredSpace + buffer.limit();
     if(usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhaused, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
@@ -718,7 +723,7 @@ class Log {
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
-    long requiredSpace = ABSOLUTE_MINIMUM_REQURED_SPACE + buffer.limit();
+    long requiredSpace = minimumRequiredSpace + buffer.limit();
     if(usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhaused, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
@@ -830,6 +835,11 @@ class Log {
    */
   private Boolean writeCheckpoint(Boolean force) throws Exception {
     boolean checkpointCompleted = false;
+    long usableSpace = checkpointDir.getUsableSpace();
+    if(usableSpace <= minimumRequiredSpace) {
+      throw new IOException("Usable space exhaused, only " + usableSpace +
+          " bytes remaining, required " + minimumRequiredSpace + " bytes");
+    }
     boolean lockAcquired = tryLockExclusive();
     if(!lockAcquired) {
       return false;

http://git-wip-us.apache.org/repos/asf/flume/blob/806a7b6a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index a165d6a..bc7b3cf 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -19,6 +19,7 @@
 package org.apache.flume.channel.file;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
@@ -27,11 +28,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class TestLog {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
   private static final long MAX_FILE_SIZE = 1000;
   private static final int CAPACITY = 10000;
   private Log log;
@@ -144,7 +148,69 @@ public class TestLog {
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Assert.assertNull(queue.removeHead(transactionID));
   }
-
+  @Test
+  public void testMinimumRequiredSpaceTooSmallOnStartup() throws IOException,
+    InterruptedException {
+    log.close();
+    log = new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(
+            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
+            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
+                dataDirs).setChannelName("testlog").
+                setMinimumRequiredSpace(Long.MAX_VALUE).build();
+    try {
+      log.replay();
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage(), e.getMessage()
+          .startsWith("Usable space exhaused"));
+    }
+  }
+  /**
+   * There is a race here in that someone could take up some space
+   */
+  @Test
+  public void testMinimumRequiredSpaceTooSmallForPut() throws IOException,
+    InterruptedException {
+    try {
+      doTestMinimumRequiredSpaceTooSmallForPut();
+    } catch (IOException e) {
+      LOGGER.info("Error during test, retrying", e);
+      doTestMinimumRequiredSpaceTooSmallForPut();
+    } catch (AssertionError e) {
+      LOGGER.info("Test failed, let's be sure it failed for good reason", e);
+      doTestMinimumRequiredSpaceTooSmallForPut();
+    }
+  }
+  public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException,
+    InterruptedException {
+    long minimumRequireSpace = checkpointDir.getUsableSpace() -
+        (10L* 1024L * 1024L);
+    log.close();
+    log = new Log.Builder().setCheckpointInterval(
+        Long.MAX_VALUE).setMaxFileSize(
+            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
+            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
+                dataDirs).setChannelName("testlog").
+                setMinimumRequiredSpace(minimumRequireSpace).build();
+    log.replay();
+    File filler = new File(checkpointDir, "filler");
+    byte[] buffer = new byte[64 * 1024];
+    FileOutputStream out = new FileOutputStream(filler);
+    while(checkpointDir.getUsableSpace() > minimumRequireSpace) {
+      out.write(buffer);
+    }
+    out.close();
+    try {
+      FlumeEvent eventIn = TestUtils.newPersistableEvent();
+      long transactionID = ++this.transactionID;
+      log.put(transactionID, eventIn);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage(), e.getMessage()
+          .startsWith("Usable space exhaused"));
+    }
+  }
   /**
    * After replay of the log, we should not find the event because the take
    * was committed

http://git-wip-us.apache.org/repos/asf/flume/blob/806a7b6a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 21ca5cc..265f546 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1666,6 +1666,7 @@ dataDirs                                          ~/.flume/file-channel/data
 transactionCapacity                               1000                              The maximum
size of transaction supported by the channel
 checkpointInterval                                30000                             Amount
of time (in millis) between checkpoints
 maxFileSize                                       2146435071                        Max size
(in bytes) of a single log file
+minimumRequiredSpace                              524288000                         Minimum
Required free space (in bytes)
 capacity                                          1000000                           Maximum
capacity of the channel
 keep-alive                                        3                                 Amount
of time (in sec) to wait for a put operation
 write-timeout                                     3                                 Amount
of time (in sec) to wait for a write operation


Mime
View raw message