flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject svn commit: r1353848 - in /incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file: Log.java LogFile.java
Date Tue, 26 Jun 2012 07:51:18 GMT
Author: mpercy
Date: Tue Jun 26 07:51:17 2012
New Revision: 1353848

URL: http://svn.apache.org/viewvc?rev=1353848&view=rev
Log:
FLUME-1319. Log replay optimization.

(Arvind Prabhakar via Mike Percy)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1353848&r1=1353847&r2=1353848&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
Tue Jun 26 07:51:17 2012
@@ -273,10 +273,31 @@ class Log {
   FlumeEvent get(FlumeEventPointer pointer) throws IOException,
   InterruptedException {
     Preconditions.checkState(open, "Log is closed");
-    int id = pointer.getFileID();
-    LogFile.RandomReader logFile = idLogFileMap.get(id);
-    Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
-    return logFile.get(pointer.getOffset());
+
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = checkpointReadLock.tryLock(logWriteTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for log write lock", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    if (!lockAcquired) {
+      throw new IOException("Failed to obtain lock for writing to the log. "
+          + "Try increasing the log write timeout value or disabling it by "
+          + "setting it to 0.");
+    }
+
+    try {
+      int id = pointer.getFileID();
+      LogFile.RandomReader logFile = idLogFileMap.get(id);
+      Preconditions.checkNotNull(logFile, "LogFile is null for id " + id);
+      return logFile.get(pointer.getOffset());
+    } finally {
+      if (lockAcquired) {
+        checkpointReadLock.unlock();
+      }
+    }
   }
 
   /**

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1353848&r1=1353847&r2=1353848&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
Tue Jun 26 07:51:17 2012
@@ -287,6 +287,7 @@ class LogFile {
     private final int logFileID;
     private final long lastCheckpointPosition;
     private final long lastCheckpointTimestamp;
+    private final File file;
 
     /**
      * Construct a Sequential Log Reader object
@@ -295,6 +296,7 @@ class LogFile {
      * @throws EOFException if the file is empty
      */
     SequentialReader(File file) throws IOException, EOFException {
+      this.file = file;
       fileHandle = new RandomAccessFile(file, "r");
       fileChannel = fileHandle.getChannel();
       version = fileHandle.readInt();
@@ -318,14 +320,14 @@ class LogFile {
     void skipToLastCheckpointPosition(long checkpointTimestamp)
         throws IOException {
       if (lastCheckpointPosition > 0L
-          && lastCheckpointTimestamp == checkpointTimestamp) {
+          && lastCheckpointTimestamp <= checkpointTimestamp) {
         LOG.info("fast-forward to checkpoint position: "
                   + lastCheckpointPosition);
         fileChannel.position(lastCheckpointPosition);
       } else {
-        LOG.warn("Checkpoint was not done or did not match."
-            + "Replaying the entire log: file = " + lastCheckpointTimestamp
-            + ", queue: " + checkpointTimestamp);
+        LOG.warn("Checkpoint for file(" + file.getAbsolutePath() + ") "
+            + "is: " + lastCheckpointTimestamp + ", which is beyond the "
+            + "requested checkpoint time: " + checkpointTimestamp + ". ");
       }
     }
     Pair<Integer, TransactionEventRecord> next() throws IOException {



Mime
View raw message