flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1893. File Channel could miss possible checkpoint corruption.
Date Wed, 08 May 2013 22:05:47 GMT
Updated Branches:
  refs/heads/flume-1.4 8125b762b -> 46eab1d12


FLUME-1893. File Channel could miss possible checkpoint corruption.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/46eab1d1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/46eab1d1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/46eab1d1

Branch: refs/heads/flume-1.4
Commit: 46eab1d1299ce11aa67143b000ba476f9a55f7f6
Parents: 8125b76
Author: Mike Percy <mpercy@apache.org>
Authored: Wed May 8 15:05:09 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed May 8 15:05:35 2013 -0700

----------------------------------------------------------------------
 .../channel/file/EventQueueBackingStoreFile.java   |    9 +--
 .../org/apache/flume/channel/file/FileChannel.java |    6 ++
 .../java/org/apache/flume/channel/file/Log.java    |    9 +++
 .../flume/channel/file/TestFileChannelRestart.java |   56 ++++++++++++++-
 4 files changed, 74 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 5884800..2366cbc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -103,14 +103,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
         checkpointFile.length());
     elementsBuffer = mappedBuffer.asLongBuffer();
 
-    int version = (int) elementsBuffer.get(INDEX_VERSION);
-    if(version != getVersion()) {
+    long version = elementsBuffer.get(INDEX_VERSION);
+    if(version != (long) getVersion()) {
       throw new BadCheckpointException("Invalid version: " + version + " " +
               name + ", expected " + getVersion());
     }
-    long checkpointComplete =
-        (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
-    if(checkpointComplete != CHECKPOINT_COMPLETE) {
+    long checkpointComplete = elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
+    if(checkpointComplete != (long) CHECKPOINT_COMPLETE) {
       throw new BadCheckpointException("Checkpoint was not completed correctly,"
               + " probably because the agent stopped while the channel was"
               + " checkpointing.");

http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index a7aa70c..cc0d38a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -391,6 +391,12 @@ public class FileChannel extends BasicChannelSemantics {
     return log.didFastReplay();
   }
 
+
+  @VisibleForTesting
+  boolean didFullReplayDueToBadCheckpointException() {
+    return log.didFullReplayDueToBadCheckpointException();
+  }
+
   public boolean isOpen() {
     return open;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index e61437d..1918baa 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -122,6 +122,7 @@ class Log {
   private Key encryptionKey;
   private final long usableSpaceRefreshInterval;
   private boolean didFastReplay = false;
+  private boolean didFullReplayDueToBadCheckpointException = false;
   private final boolean useDualCheckpoints;
   private volatile boolean backupRestored = false;
 
@@ -454,6 +455,9 @@ class Log {
         // trigger fast replay if the channel is configured to.
         shouldFastReplay = this.useFastReplay;
         doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
+        if(!shouldFastReplay) {
+          didFullReplayDueToBadCheckpointException = true;
+        }
       }
 
 
@@ -541,6 +545,11 @@ class Log {
     return backupRestored;
   }
 
+  @VisibleForTesting
+  boolean didFullReplayDueToBadCheckpointException() {
+    return didFullReplayDueToBadCheckpointException;
+  }
+
   int getNextFileID() {
     Preconditions.checkState(open, "Log is closed");
     return nextFileID.get();

http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index dc6fc45..d16f3d5 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -401,7 +401,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER
-            * Serialization.SIZE_OF_LONG);
+      * Serialization.SIZE_OF_LONG);
     writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
     writer.getFD().sync();
     writer.close();
@@ -609,6 +609,60 @@ public class TestFileChannelRestart extends TestFileChannelBase {
       Assert.assertFalse(backupRestored);
     }
   }
+ 
+  //This test will fail without FLUME-1893
+  @Test
+  public void testCorruptCheckpointVersionMostSignificant4Bytes()
+    throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    writer.seek(EventQueueBackingStoreFile.INDEX_VERSION *
+      Serialization.SIZE_OF_LONG);
+    writer.write(new byte[]{(byte)1, (byte)5});
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    Assert.assertTrue(channel.didFullReplayDueToBadCheckpointException());
+    compareInputAndOut(in, out);
+  }
+
+  //This test will fail without FLUME-1893
+  @Test
+  public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes()
+    throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER *
+      Serialization.SIZE_OF_LONG);
+    writer.write(new byte[]{(byte) 1, (byte) 5});
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    Assert.assertTrue(channel.didFullReplayDueToBadCheckpointException());
+    compareInputAndOut(in, out);
+  }
 
   @Test
   public void testWithExtraLogs()


Mime
View raw message