Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 13AB110BB4 for ; Wed, 8 May 2013 22:05:48 +0000 (UTC) Received: (qmail 23825 invoked by uid 500); 8 May 2013 22:05:48 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 23806 invoked by uid 500); 8 May 2013 22:05:48 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 23799 invoked by uid 99); 8 May 2013 22:05:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 22:05:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BCC5088A0CF; Wed, 8 May 2013 22:05:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <3b6b11fcdbc34d54b4bf1c5b139ad6b8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1893. File Channel could miss possible checkpoint corruption. Date: Wed, 8 May 2013 22:05:47 +0000 (UTC) Updated Branches: refs/heads/flume-1.4 8125b762b -> 46eab1d12 FLUME-1893. File Channel could miss possible checkpoint corruption. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/46eab1d1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/46eab1d1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/46eab1d1 Branch: refs/heads/flume-1.4 Commit: 46eab1d1299ce11aa67143b000ba476f9a55f7f6 Parents: 8125b76 Author: Mike Percy Authored: Wed May 8 15:05:09 2013 -0700 Committer: Mike Percy Committed: Wed May 8 15:05:35 2013 -0700 ---------------------------------------------------------------------- .../channel/file/EventQueueBackingStoreFile.java | 9 +-- .../org/apache/flume/channel/file/FileChannel.java | 6 ++ .../java/org/apache/flume/channel/file/Log.java | 9 +++ .../flume/channel/file/TestFileChannelRestart.java | 56 ++++++++++++++- 4 files changed, 74 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 5884800..2366cbc 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -103,14 +103,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { checkpointFile.length()); elementsBuffer = mappedBuffer.asLongBuffer(); - int version = (int) elementsBuffer.get(INDEX_VERSION); - if(version != getVersion()) { + long version = elementsBuffer.get(INDEX_VERSION); + if(version != (long) getVersion()) { throw new BadCheckpointException("Invalid version: " + version + " " + name + ", expected " + getVersion()); } - long checkpointComplete = - (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER); - if(checkpointComplete != CHECKPOINT_COMPLETE) { + long checkpointComplete = elementsBuffer.get(INDEX_CHECKPOINT_MARKER); + if(checkpointComplete != (long) CHECKPOINT_COMPLETE) { throw new BadCheckpointException("Checkpoint was not completed correctly," + " probably because the agent stopped while the channel was" + " checkpointing."); http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index a7aa70c..cc0d38a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -391,6 +391,12 @@ public class FileChannel extends BasicChannelSemantics { return log.didFastReplay(); } + + @VisibleForTesting + boolean didFullReplayDueToBadCheckpointException() { + return log.didFullReplayDueToBadCheckpointException(); + } + public boolean isOpen() { return open; } http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index e61437d..1918baa 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -122,6 +122,7 @@ class Log { private Key encryptionKey; private final long usableSpaceRefreshInterval; private boolean didFastReplay = false; + private boolean didFullReplayDueToBadCheckpointException = false; private final boolean useDualCheckpoints; private volatile boolean backupRestored = false; @@ -454,6 +455,9 @@ class Log { // trigger fast replay if the channel is configured to. shouldFastReplay = this.useFastReplay; doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay); + if(!shouldFastReplay) { + didFullReplayDueToBadCheckpointException = true; + } } @@ -541,6 +545,11 @@ class Log { return backupRestored; } + @VisibleForTesting + boolean didFullReplayDueToBadCheckpointException() { + return didFullReplayDueToBadCheckpointException; + } + int getNextFileID() { Preconditions.checkState(open, "Log is closed"); return nextFileID.get(); http://git-wip-us.apache.org/repos/asf/flume/blob/46eab1d1/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index dc6fc45..d16f3d5 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -401,7 +401,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER - * Serialization.SIZE_OF_LONG); + * Serialization.SIZE_OF_LONG); writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); writer.getFD().sync(); writer.close(); @@ -609,6 +609,60 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertFalse(backupRestored); } } + + //This test will fail without FLUME-1893 + @Test + public void testCorruptCheckpointVersionMostSignificant4Bytes() + throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * + Serialization.SIZE_OF_LONG); + writer.write(new byte[]{(byte)1, (byte)5}); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + Assert.assertTrue(channel.didFullReplayDueToBadCheckpointException()); + compareInputAndOut(in, out); + } + + //This test will fail without FLUME-1893 + @Test + public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes() + throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * + Serialization.SIZE_OF_LONG); + writer.write(new byte[]{(byte) 1, (byte) 5}); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + Assert.assertTrue(channel.didFullReplayDueToBadCheckpointException()); + compareInputAndOut(in, out); + } @Test public void testWithExtraLogs()