flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: FLUME-2307. Remove Log writetimeout
Date Mon, 10 Feb 2014 21:25:14 GMT
Updated Branches:
  refs/heads/trunk ba0b2685b -> b4ddd5829


FLUME-2307. Remove Log writetimeout

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b4ddd582
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b4ddd582
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b4ddd582

Branch: refs/heads/trunk
Commit: b4ddd5829897f758f869a5fc3b08dcbf4b55156a
Parents: ba0b268
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Mon Feb 10 13:23:49 2014 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Mon Feb 10 13:23:49 2014 -0800

----------------------------------------------------------------------
 .../apache/flume/channel/file/FileChannel.java  | 82 +++--------------
 .../channel/file/FileChannelConfiguration.java  | 13 ---
 .../java/org/apache/flume/channel/file/Log.java | 96 +++++---------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  2 -
 4 files changed, 38 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 2cd7f03..71b26f7 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -90,8 +90,6 @@ public class FileChannel extends BasicChannelSemantics {
   private Semaphore queueRemaining;
   private final ThreadLocal<FileBackedTransaction> transactions =
       new ThreadLocal<FileBackedTransaction>();
-  private int logWriteTimeout;
-  private int checkpointWriteTimeout;
   private String channelNameDescriptor = "[channel=unknown]";
   private ChannelCounter channelCounter;
   private boolean useLogReplayV1;
@@ -190,39 +188,14 @@ public class FileChannel extends BasicChannelSemantics {
 
     // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
     maxFileSize = Math.min(
-        context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
+      context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
+        FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
+      FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
 
     minimumRequiredSpace = Math.max(
-        context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE,
-            FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE),
-            FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE);
-
-    logWriteTimeout = context.getInteger(
-        FileChannelConfiguration.LOG_WRITE_TIMEOUT,
-        FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT);
-
-    if (logWriteTimeout < 0) {
-      LOG.warn("Log write time out is invalid: " + logWriteTimeout
-          + ", using default: "
-          + FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT);
-
-      logWriteTimeout = FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
-    }
-
-    checkpointWriteTimeout = context.getInteger(
-        FileChannelConfiguration.CHECKPOINT_WRITE_TIMEOUT,
-        FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT);
-
-    if (checkpointWriteTimeout < 0) {
-      LOG.warn("Checkpoint write time out is invalid: " + checkpointWriteTimeout
-          + ", using default: "
-          + FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT);
-
-      checkpointWriteTimeout =
-          FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
-    }
+      context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE,
+        FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE),
+      FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE);
 
     useLogReplayV1 = context.getBoolean(
         FileChannelConfiguration.USE_LOG_REPLAY_V1,
@@ -285,11 +258,9 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setMaxFileSize(maxFileSize);
       builder.setMinimumRequiredSpace(minimumRequiredSpace);
       builder.setQueueSize(capacity);
-      builder.setLogWriteTimeout(logWriteTimeout);
       builder.setCheckpointDir(checkpointDir);
       builder.setLogDirs(dataDirs);
       builder.setChannelName(getName());
-      builder.setCheckpointWriteTimeout(checkpointWriteTimeout);
       builder.setUseLogReplayV1(useLogReplayV1);
       builder.setUseFastReplay(useFastReplay);
       builder.setEncryptionKeyProvider(encryptionKeyProvider);
@@ -471,13 +442,8 @@ public class FileChannel extends BasicChannelSemantics {
             + channelNameDescriptor);
       }
       boolean success = false;
-      boolean lockAcquired = log.tryLockShared();
+      log.lockShared();
       try {
-        if(!lockAcquired) {
-          throw new ChannelException("Failed to obtain lock for writing to the "
-              + "log. Try increasing the log write timeout value. " +
-              channelNameDescriptor);
-        }
         FlumeEventPointer ptr = log.put(transactionID, event);
         Preconditions.checkState(putList.offer(ptr), "putList offer failed "
           + channelNameDescriptor);
@@ -487,9 +453,7 @@ public class FileChannel extends BasicChannelSemantics {
         throw new ChannelException("Put failed due to IO error "
                 + channelNameDescriptor, e);
       } finally {
-        if(lockAcquired) {
-          log.unlockShared();
-        }
+        log.unlockShared();
         if(!success) {
           // release slot obtained in the case
           // the put fails for any reason
@@ -507,12 +471,7 @@ public class FileChannel extends BasicChannelSemantics {
             "increasing capacity, or increasing thread count. "
                + channelNameDescriptor);
       }
-      if(!log.tryLockShared()) {
-        throw new ChannelException("Failed to obtain lock for writing to the "
-            + "log. Try increasing the log write timeout value. " +
-            channelNameDescriptor);
-      }
-
+      log.lockShared();
       /*
        * 1. Take an event which is in the queue.
        * 2. If getting that event does not throw NoopRecordException,
@@ -557,11 +516,7 @@ public class FileChannel extends BasicChannelSemantics {
       if(puts > 0) {
         Preconditions.checkState(takes == 0, "nonzero puts and takes "
                 + channelNameDescriptor);
-        if(!log.tryLockShared()) {
-          throw new ChannelException("Failed to obtain lock for writing to the "
-              + "log. Try increasing the log write timeout value. " +
-              channelNameDescriptor);
-        }
+        log.lockShared();
         try {
           log.commitPut(transactionID);
           channelCounter.addToEventPutSuccessCount(puts);
@@ -589,11 +544,7 @@ public class FileChannel extends BasicChannelSemantics {
         }
 
       } else if (takes > 0) {
-        if(!log.tryLockShared()) {
-          throw new ChannelException("Failed to obtain lock for writing to the "
-              + "log. Try increasing the log write timeout value. " +
-              channelNameDescriptor);
-        }
+        log.lockShared();
         try {
           log.commitTake(transactionID);
           queue.completeTransaction(transactionID);
@@ -614,13 +565,8 @@ public class FileChannel extends BasicChannelSemantics {
     protected void doRollback() throws InterruptedException {
       int puts = putList.size();
       int takes = takeList.size();
-      boolean lockAcquired = log.tryLockShared();
+      log.lockShared();
       try {
-        if(!lockAcquired) {
-          throw new ChannelException("Failed to obtain lock for writing to the "
-              + "log. Try increasing the log write timeout value. " +
-              channelNameDescriptor);
-        }
         if(takes > 0) {
           Preconditions.checkState(puts == 0, "nonzero puts and takes "
               + channelNameDescriptor);
@@ -641,9 +587,7 @@ public class FileChannel extends BasicChannelSemantics {
         throw new ChannelException("Commit failed due to IO error "
             + channelNameDescriptor, e);
       } finally {
-        if(lockAcquired) {
-          log.unlockShared();
-        }
+        log.unlockShared();
         // since rollback is being called, puts will never make it on
         // to the queue and we need to be sure to release the resources
         queueRemaining.release(puts);

http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 10ca11f..e4bc879 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -76,19 +76,6 @@ public class FileChannelConfiguration {
   public static final int DEFAULT_KEEP_ALIVE = 3;
 
   /**
-   * The amount of time in seconds a writer will wait before failing when
-   * checkpoint is enqueued or in progress.
-   */
-  public static final String LOG_WRITE_TIMEOUT = "write-timeout";
-  public static final int DEFAULT_WRITE_TIMEOUT = 10;
-
-  /**
-   * The amount of time in seconds the channel should wait to write the
-   * checkpoint when some other operation(s) are enqueued or in progress.
-   */
-  public static final String CHECKPOINT_WRITE_TIMEOUT = "checkpoint-timeout";
-  public static final int DEFAULT_CHECKPOINT_WRITE_TIMEOUT = 600;
-  /**
    * Turn on Flume 1.2 log replay logic
    */
   public static final String USE_LOG_REPLAY_V1 = "use-log-replay-v1";

http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 70106cb..579ee35 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.io.FileUtils;
-import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
@@ -66,8 +65,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  * the on disk write ahead log with the last checkpoint of the queue.
  *
  * Before calling any of commitPut/commitTake/get/put/rollback/take
- * Log.tryLockShared should be called and the above operations
- * should only be called if tryLockShared returns true. After
+ * {@linkplain org.apache.flume.channel.file.Log#lockShared()}
+ * should be called. After
  * the operation and any additional modifications of the
  * FlumeEventQueue, the Log.unlockShared method should be called.
  */
@@ -114,9 +113,7 @@ public class Log {
    * Exclusive lock
    */
   private final WriteLock checkpointWriterLock = checkpointLock.writeLock();
-  private int logWriteTimeout;
   private final String channelNameDescriptor;
-  private int checkpointWriteTimeout;
   private boolean useLogReplayV1;
   private KeyProvider encryptionKeyProvider;
   private String encryptionCipherProvider;
@@ -143,11 +140,7 @@ public class Log {
     private int bQueueCapacity;
     private File bCheckpointDir;
     private File[] bLogDirs;
-    private int bLogWriteTimeout =
-        FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
     private String bName;
-    private int bCheckpointWriteTimeout =
-        FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
     private boolean useLogReplayV1;
     private boolean useFastReplay;
     private KeyProvider bEncryptionKeyProvider;
@@ -187,11 +180,6 @@ public class Log {
       return this;
     }
 
-    Builder setLogWriteTimeout(int timeout) {
-      bLogWriteTimeout = timeout;
-      return this;
-    }
-
     Builder setChannelName(String name) {
       bName = name;
       return this;
@@ -202,11 +190,6 @@ public class Log {
       return this;
     }
 
-    Builder setCheckpointWriteTimeout(int checkpointTimeout){
-      bCheckpointWriteTimeout = checkpointTimeout;
-      return this;
-    }
-
     Builder setUseLogReplayV1(boolean useLogReplayV1){
       this.useLogReplayV1 = useLogReplayV1;
       return this;
@@ -244,23 +227,21 @@ public class Log {
 
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-          bLogWriteTimeout, bCheckpointWriteTimeout, bUseDualCheckpoints,
-          bCheckpointDir, bBackupCheckpointDir, bName,
-          useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
-          bEncryptionKeyProvider, bEncryptionKeyAlias,
-          bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
-          bLogDirs);
+        bUseDualCheckpoints, bCheckpointDir, bBackupCheckpointDir, bName,
+        useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
+        bEncryptionKeyProvider, bEncryptionKeyAlias,
+        bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
+        bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-      int logWriteTimeout, int checkpointWriteTimeout,
-      boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
-      String name, boolean useLogReplayV1, boolean useFastReplay,
-      long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
-      @Nullable String encryptionKeyAlias,
-      @Nullable String encryptionCipherProvider,
-      long usableSpaceRefreshInterval, File... logDirs)
+    boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
+    String name, boolean useLogReplayV1, boolean useFastReplay,
+    long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
+    @Nullable String encryptionKeyAlias,
+    @Nullable String encryptionCipherProvider,
+    long usableSpaceRefreshInterval, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
       "checkpointInterval <= 0");
@@ -337,8 +318,6 @@ public class Log {
     this.checkpointDir = checkpointDir;
     this.backupCheckpointDir = backupCheckpointDir;
     this.logDirs = logDirs;
-    this.logWriteTimeout = logWriteTimeout;
-    this.checkpointWriteTimeout = checkpointWriteTimeout;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
       ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -356,9 +335,7 @@ public class Log {
   void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log has been opened");
 
-    Preconditions.checkState(tryLockExclusive(), "Cannot obtain lock on "
-        + channelNameDescriptor);
-
+    lockExclusive();
     try {
       /*
        * First we are going to look through the data directories
@@ -751,28 +728,12 @@ public class Log {
   }
 
 
-  private boolean tryLockExclusive() {
-    try {
-      return checkpointWriterLock.tryLock(checkpointWriteTimeout,
-          TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log exclusive lock", ex);
-      Thread.currentThread().interrupt();
-    }
-    return false;
-  }
   private void unlockExclusive()  {
     checkpointWriterLock.unlock();
   }
 
-  boolean tryLockShared() {
-    try {
-      return checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Interrupted while waiting for log shared lock", ex);
-      Thread.currentThread().interrupt();
-    }
-    return false;
+  void lockShared() {
+    checkpointReadLock.lock();
   }
 
   void unlockShared()  {
@@ -929,29 +890,25 @@ public class Log {
    * @param index
    * @throws IOException
    */
-    private synchronized void roll(int index, ByteBuffer buffer)
-      throws IOException {
-    if (!tryLockShared()) {
-      throw new ChannelException("Failed to obtain lock for writing to the "
-          + "log. Try increasing the log write timeout value. " +
-          channelNameDescriptor);
-    }
+  private synchronized void roll(int index, ByteBuffer buffer)
+    throws IOException {
+    lockShared();
 
     try {
       LogFile.Writer oldLogFile = logFiles.get(index);
       // check to make sure a roll is actually required due to
       // the possibility of multiple writes waiting on lock
-      if(oldLogFile == null || buffer == null ||
-          oldLogFile.isRollRequired(buffer)) {
+      if (oldLogFile == null || buffer == null ||
+        oldLogFile.isRollRequired(buffer)) {
         try {
           LOGGER.info("Roll start " + logDirs[index]);
           int fileID = nextFileID.incrementAndGet();
           File file = new File(logDirs[index], PREFIX + fileID);
           LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
-              maxFileSize, encryptionKey, encryptionKeyAlias,
-              encryptionCipherProvider, usableSpaceRefreshInterval);
+            maxFileSize, encryptionKey, encryptionKeyAlias,
+            encryptionCipherProvider, usableSpaceRefreshInterval);
           idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
-              encryptionKeyProvider));
+            encryptionKeyProvider));
           // writer from this point on will get new reference
           logFiles.set(index, writer);
           // close out old log
@@ -988,10 +945,7 @@ public class Log {
       throw new IOException("Usable space exhaused, only " + usableSpace +
           " bytes remaining, required " + minimumRequiredSpace + " bytes");
     }
-    boolean lockAcquired = tryLockExclusive();
-    if(!lockAcquired) {
-      return false;
-    }
+    lockExclusive();
     SortedSet<Integer> logFileRefCountsAll = null, logFileRefCountsActive = null;
     try {
       if (queue.checkpoint(force)) {

http://git-wip-us.apache.org/repos/asf/flume/blob/b4ddd582/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d120a74..1ec5a22 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2180,8 +2180,6 @@ maxFileSize                                       2146435071
 minimumRequiredSpace                              524288000                         Minimum
Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put
requests when free space drops below this value
 capacity                                          1000000                           Maximum
capacity of the channel
 keep-alive                                        3                                 Amount
of time (in sec) to wait for a put operation
-write-timeout                                     10                                Amount
of time (in sec) to wait for a write operation
-checkpoint-timeout                                600                               Expert:
Amount of time (in sec) to wait for a checkpoint
 use-log-replay-v1                                 false                             Expert:
Use old replay logic
 use-fast-replay                                   false                             Expert:
Replay without using queue
 encryption.activeKey                              --                                Key name
used to encrypt new data


Mime
View raw message