flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1354800 - in /incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file: FileChannel.java FileChannelConfiguration.java Log.java
Date Thu, 28 Jun 2012 02:51:23 GMT
Author: arvind
Date: Thu Jun 28 02:51:22 2012
New Revision: 1354800

URL: http://svn.apache.org/viewvc?rev=1354800&view=rev
Log:
FLUME-1327. File Channel can deadlock during checkpoint.

(Hari Shreedharan via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1354800&r1=1354799&r2=1354800&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
Thu Jun 28 02:51:22 2012
@@ -82,6 +82,7 @@ public class FileChannel extends BasicCh
   private final ThreadLocal<FileBackedTransaction> transactions =
       new ThreadLocal<FileBackedTransaction>();
   private int logWriteTimeout;
+  private int checkpointWriteTimeout;
   private String channelNameDescriptor = "[channel=unknown]";
 
   @Override
@@ -180,6 +181,19 @@ public class FileChannel extends BasicCh
       logWriteTimeout = FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
     }
 
+    checkpointWriteTimeout = context.getInteger(
+        FileChannelConfiguration.CHECKPOINT_WRITE_TIMEOUT,
+        FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT);
+
+    if (checkpointWriteTimeout < 0) {
+      LOG.warn("Checkpoint write time out is invalid: " + checkpointWriteTimeout
+          + ", using default: "
+          + FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT);
+
+      checkpointWriteTimeout =
+          FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
+    }
+
 
     if(queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
@@ -202,7 +216,7 @@ public class FileChannel extends BasicCh
       builder.setCheckpointDir(checkpointDir);
       builder.setLogDirs(dataDirs);
       builder.setChannelName(getName());
-
+      builder.setCheckpointWriteTimeout(checkpointWriteTimeout);
       log = builder.build();
 
       log.replay();

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java?rev=1354800&r1=1354799&r2=1354800&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
Thu Jun 28 02:51:22 2012
@@ -65,4 +65,11 @@ public class FileChannelConfiguration {
    */
   public static final String LOG_WRITE_TIMEOUT = "write-timeout";
   public static final int DEFAULT_WRITE_TIMEOUT = 10;
+
+  /**
+   * The amount of time in seconds the channel should wait to write the
+   * checkpoint when some other operation(s) are enqueued or in progress.
+   */
+  public static final String CHECKPOINT_WRITE_TIMEOUT = "checkpoint-timeout";
+  public static final int DEFAULT_CHECKPOINT_WRITE_TIMEOUT = 600;
 }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1354800&r1=1354799&r2=1354800&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
Thu Jun 28 02:51:22 2012
@@ -81,6 +81,7 @@ class Log {
   private int logWriteTimeout;
   private final String channelName;
   private final String channelNameDescriptor;
+  private int checkpointWriteTimeout;
 
   static class Builder {
     private long bCheckpointInterval;
@@ -91,6 +92,8 @@ class Log {
     private int bLogWriteTimeout =
         FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
     private String bName;
+    private int bCheckpointWriteTimeout =
+        FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
 
     Builder setCheckpointInterval(long interval) {
       bCheckpointInterval = interval;
@@ -127,14 +130,21 @@ class Log {
       return this;
     }
 
+    Builder setCheckpointWriteTimeout(int checkpointTimeout){
+      bCheckpointWriteTimeout = checkpointTimeout;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-          bLogWriteTimeout, bCheckpointDir, bName, bLogDirs);
+          bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
+          bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-      int logWriteTimeout, File checkpointDir, String name, File... logDirs)
+      int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
+      String name, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
@@ -173,6 +183,7 @@ class Log {
     this.checkpointDir = checkpointDir;
     this.logDirs = logDirs;
     this.logWriteTimeout = logWriteTimeout;
+    this.checkpointWriteTimeout = checkpointWriteTimeout;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     worker = new BackgroundWorker(this);
     worker.setName("Log-BackgroundWorker");
@@ -185,7 +196,7 @@ class Log {
    * directly before the shutdown or crash.
    * @throws IOException
    */
-  synchronized void replay() throws IOException {
+  void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log as been opened");
 
     checkpointWriterLock.lock();
@@ -623,7 +634,9 @@ class Log {
    * rolls
    *
    * Synchronization required since both synchronized and unsynchronized
-   * methods call this method.
+   * methods call this method, and this method acquires only a
+   * read lock. The synchronization guarantees that multiple threads don't
+   * roll at the same time.
    * @param index
    * @throws IOException
    */
@@ -675,20 +688,33 @@ class Log {
     }
   }
 
-  private synchronized void writeCheckpoint() throws IOException {
-    writeCheckpoint(false);
+  private boolean writeCheckpoint() throws IOException {
+    return writeCheckpoint(false);
   }
 
   /**
    * Write the current checkpoint object and then swap objects so that
    * the next checkpoint occurs on the other checkpoint directory.
    *
-   * Synchronization required since both synchronized and unsynchronized
+   * Synchronization is not required because this method acquires a
+   * write lock. So this method gets exclusive access to all the
+   * data structures this method accesses.
    * @param force  a flag to force the writing of checkpoint
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
-  private synchronized void writeCheckpoint(boolean force) throws IOException {
-    checkpointWriterLock.lock();
+  private boolean writeCheckpoint(boolean force)
+      throws IOException {
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointWriterLock.tryLock(this.checkpointWriteTimeout,
+          TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting to acquire lock.", e);
+      Thread.currentThread().interrupt();
+    }
+    if(!lockAcquired) {
+      return false;
+    }
     try {
       if (queue.checkpoint(force) || force) {
         long ts = queue.getTimestamp();
@@ -727,6 +753,9 @@ class Log {
     } finally {
       checkpointWriterLock.unlock();
     }
+    //Since the exception is not caught, this will not be returned if
+    //an exception is thrown from the try.
+    return true;
   }
 
   private void removeOldLogs() {
@@ -860,8 +889,9 @@ class Log {
             long currentTime = System.currentTimeMillis();
             long elapsed = currentTime - lastCheckTime;
             if (elapsed > log.checkpointInterval) {
-              log.writeCheckpoint();
-              lastCheckTime = currentTime;
+              if(log.writeCheckpoint()) {
+                lastCheckTime = currentTime;
+              }
             }
           }
           if(log.open) {



Mime
View raw message