flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [10/11] git commit: FLUME-1762: File Channel should recover automatically if the checkpoint is incomplete or bad by deleting the contents of the checkpoint directory
Date Thu, 20 Dec 2012 08:15:20 GMT
FLUME-1762: File Channel should recover automatically if the checkpoint is incomplete or bad by deleting the contents of the checkpoint directory

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/38e67d5a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/38e67d5a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/38e67d5a

Branch: refs/heads/flume-1.3.1
Commit: 38e67d5acfec3f534f7f33b275967b0adb0266ca
Parents: 019358d
Author: Brock Noland <brock@apache.org>
Authored: Mon Dec 10 13:38:23 2012 -0600
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 20 00:12:26 2012 -0800

----------------------------------------------------------------------
 .../flume/channel/file/BadCheckpointException.java |   36 +++
 .../file/EventQueueBackingStoreFactory.java        |   25 +-
 .../channel/file/EventQueueBackingStoreFile.java   |   21 +-
 .../channel/file/EventQueueBackingStoreFileV2.java |    2 +-
 .../channel/file/EventQueueBackingStoreFileV3.java |   29 ++-
 .../apache/flume/channel/file/FlumeEventQueue.java |   11 +-
 .../java/org/apache/flume/channel/file/Log.java    |   76 ++++--
 .../apache/flume/channel/file/Serialization.java   |   32 ++
 .../file/TestEventQueueBackingStoreFactory.java    |  195 +++++++++++--
 .../flume/channel/file/TestFileChannelRestart.java |  228 ++++++++++++++-
 .../flume/channel/file/TestFlumeEventQueue.java    |   74 +++++-
 11 files changed, 633 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
new file mode 100644
index 0000000..588506a
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import org.apache.flume.FlumeException;
+
+/**
+ * Exception thrown when the checkpoint directory contains invalid data,
+ * probably due to the channel stopping while the checkpoint was written.
+ */
+public class BadCheckpointException extends FlumeException{
+  private static final long serialVersionUID = -5038652693746472779L;
+
+  public BadCheckpointException(String msg) {
+    super(msg);
+  }
+  public BadCheckpointException(String msg, Throwable t) {
+    super(msg, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index 6c07152..faec50b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -44,15 +44,16 @@ class EventQueueBackingStoreFactory {
       boolean metaDataExists = metaDataFile.exists();
       if(metaDataExists) {
         // if we have a metadata file but no checkpoint file, we have a problem
+        // delete everything in the checkpoint directory and force
+        // a full replay.
         if(!checkpointExists || checkpointFile.length() == 0) {
-          LOG.error("MetaData file for checkpoint " +
-              " exists but checkpoint does not. Checkpoint = " + checkpointFile +
-              ", metaDataFile = " + metaDataFile);
-          throw new IllegalStateException(
-              "The last checkpoint was not completed correctly. Please delete "
-                  + "the checkpoint files: " + checkpointFile + " and "
-                  + Serialization.getMetaDataFile(checkpointFile)
-                  + " to rebuild the checkpoint and start again. " + name);
+          LOG.warn("MetaData file for checkpoint "
+                  + " exists but checkpoint does not. Checkpoint = " + checkpointFile
+                  + ", metaDataFile = " + metaDataFile);
+          throw new BadCheckpointException(
+                  "The last checkpoint was not completed correctly. "
+                  + "Please delete all files in the checkpoint directory: "
+                  + checkpointFile.getParentFile());
         }
       }
       // brand new, use v3
@@ -76,11 +77,8 @@ class EventQueueBackingStoreFactory {
       }
       LOG.error("Found version " + Integer.toHexString(version) + " in " +
           checkpointFile);
-      throw new IllegalStateException(
-          "The last checkpoint was not completed correctly. Please delete "
-              + "the checkpoint files: " + checkpointFile + " and "
-              + Serialization.getMetaDataFile(checkpointFile)
-              + " to rebuild the checkpoint and start again. " + name);
+      throw new BadCheckpointException("Checkpoint file exists with " +
+              Serialization.VERSION_3 + " but no metadata file found.");
     } finally {
       if(checkpointFileHandle != null) {
         try {
@@ -107,4 +105,5 @@ class EventQueueBackingStoreFactory {
         metaDataFile);
     return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 5eaf8c2..186b15a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -58,7 +58,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
   protected final File checkpointFile;
 
   protected EventQueueBackingStoreFile(int capacity, String name,
-      File checkpointFile) throws IOException {
+      File checkpointFile) throws IOException, BadCheckpointException {
     super(capacity, name);
     this.checkpointFile = checkpointFile;
     checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
@@ -77,23 +77,24 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
           ((checkpointFile.length() / Serialization.SIZE_OF_LONG) - HEADER_SIZE)
           + ". See FileChannel documentation on how to change a channels" +
           " capacity.";
-      throw new IllegalStateException(msg);
+      throw new BadCheckpointException(msg);
     }
     mappedBuffer = checkpointFileHandle.getChannel().map(MapMode.READ_WRITE, 0,
         checkpointFile.length());
     elementsBuffer = mappedBuffer.asLongBuffer();
 
     int version = (int) elementsBuffer.get(INDEX_VERSION);
-    Preconditions.checkState(version == getVersion(),
-        "Invalid version: " + version + " " + name + ", expected " + getVersion());
-
+    if(version != getVersion()) {
+      throw new BadCheckpointException("Invalid version: " + version + " " +
+              name + ", expected " + getVersion());
+    }
     long checkpointComplete =
         (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
-    Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE,
-        "The last checkpoint was not completed correctly. Please delete "
-            + "the checkpoint files: " + checkpointFile + " and "
-            + Serialization.getMetaDataFile(checkpointFile)
-            + " to rebuild the checkpoint and start again. " + name);
+    if(checkpointComplete != CHECKPOINT_COMPLETE) {
+      throw new BadCheckpointException("Checkpoint was not completed correctly,"
+              + " probably because the agent stopped while the channel was"
+              + " checkpointing.");
+    }
   }
 
   protected long getCheckpointLogWriteOrderID() {

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
index 8bbc081..abd2ea3 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -35,7 +35,7 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
   private static final int MAX_ACTIVE_LOGS = 1024;
 
   EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name)
-      throws IOException {
+      throws IOException, BadCheckpointException {
     super(capacity, name, checkpointFile);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index c24f89f..451a9d4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
   private static final Logger LOG = LoggerFactory
@@ -38,7 +39,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
   private final File metaDataFile;
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name)
-      throws IOException {
+      throws IOException, BadCheckpointException {
     super(capacity, name, checkpointFile);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
@@ -50,20 +51,22 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
         LOG.info("Reading checkpoint metadata from " + metaDataFile);
         ProtosFactory.Checkpoint checkpoint =
             ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream);
+        if(checkpoint == null) {
+          throw new BadCheckpointException("The checkpoint metadata file does "
+                  + "not exist or has zero length");
+        }
         int version = checkpoint.getVersion();
-        Preconditions.checkState(version == getVersion(),
-            "Invalid version: " + version + " " + name + ", expected "
-                + getVersion());
+        if(version != getVersion()) {
+          throw new BadCheckpointException("Invalid version: " + version +
+                  " " + name + ", expected " + getVersion());
+        }
         long logWriteOrderID = checkpoint.getWriteOrderID();
         if(logWriteOrderID != getCheckpointLogWriteOrderID()) {
-          LOG.error("Checkpoint and Meta files have differing " +
+          String msg = "Checkpoint and Meta files have differing " +
               "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and "
-              + logWriteOrderID);
-          throw new IllegalStateException(
-              "The last checkpoint was not completed correctly. Please delete "
-                  + "the checkpoint files: " + checkpointFile + " and "
-                  + Serialization.getMetaDataFile(checkpointFile)
-                  + " to rebuild the checkpoint and start again. " + name);
+              + logWriteOrderID;
+          LOG.warn(msg);
+          throw new BadCheckpointException(msg);
         }
         WriteOrderOracle.setSeed(logWriteOrderID);
         setLogWriteOrderID(logWriteOrderID);
@@ -74,6 +77,10 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
           Integer count = activeLog.getCount();
           logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count));
         }
+      } catch (InvalidProtocolBufferException ex) {
+        throw new BadCheckpointException("Checkpoint metadata file is invalid. "
+                + "The agent might have been stopped while it was being "
+                + "written", ex);
       } finally {
         try {
           inputStream.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 36553c5..74a2bc8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -79,11 +79,13 @@ final class FlumeEventQueue {
     }
   }
 
-  SetMultimap<Long, Long> deserializeInflightPuts() throws IOException{
+  SetMultimap<Long, Long> deserializeInflightPuts()
+          throws IOException, BadCheckpointException{
     return inflightPuts.deserialize();
   }
 
-  SetMultimap<Long, Long> deserializeInflightTakes() throws IOException{
+  SetMultimap<Long, Long> deserializeInflightTakes()
+          throws IOException, BadCheckpointException{
     return inflightTakes.deserialize();
   }
 
@@ -467,7 +469,8 @@ final class FlumeEventQueue {
      * @return - map of inflight events per txnID.
      *
      */
-    public SetMultimap<Long, Long> deserialize() throws IOException {
+    public SetMultimap<Long, Long> deserialize()
+            throws IOException, BadCheckpointException {
       SetMultimap<Long, Long> inflights = HashMultimap.create();
       if (!fileChannel.isOpen()) {
         file = new RandomAccessFile(inflightEventsFile, "rw");
@@ -484,7 +487,7 @@ final class FlumeEventQueue {
       fileChannel.read(buffer);
       byte[] fileChecksum = digest.digest(buffer.array());
       if (!Arrays.equals(checksum, fileChecksum)) {
-        throw new IllegalStateException("Checksum of inflights file differs"
+        throw new BadCheckpointException("Checksum of inflights file differs"
                 + " from the checksum expected.");
       }
       buffer.position(0);

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 6d1cf51..ea98e5d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -336,33 +336,40 @@ class Log {
       }
       File inflightTakesFile = new File(checkpointDir, "inflighttakes");
       File inflightPutsFile = new File(checkpointDir, "inflightputs");
-      EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
-              channelNameDescriptor);
-      queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-            inflightPutsFile);
-      LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
-        + ", queue depth = " + queue.getSize());
+      EventQueueBackingStore backingStore = null;
 
-      /*
-       * We now have everything we need to actually replay the log files
-       * the queue, the timestamp the queue was written to disk, and
-       * the list of data files.
-       */
-      CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
-          queue);
-      if(useFastReplay && rebuilder.rebuild()) {
-        LOGGER.info("Fast replay successful.");
-      } else {
-        ReplayHandler replayHandler = new ReplayHandler(queue,
-            encryptionKeyProvider);
-        if(useLogReplayV1) {
-          LOGGER.info("Replaying logs with v1 replay logic");
-          replayHandler.replayLogv1(dataFiles);
-        } else {
-          LOGGER.info("Replaying logs with v2 replay logic");
-          replayHandler.replayLog(dataFiles);
+
+      try {
+        backingStore =
+                EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
+                channelNameDescriptor);
+        queue = new FlumeEventQueue(backingStore, inflightTakesFile,
+                inflightPutsFile);
+        LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
+                + ", queue depth = " + queue.getSize());
+
+        /*
+         * We now have everything we need to actually replay the log files
+         * the queue, the timestamp the queue was written to disk, and
+         * the list of data files.
+         *
+         * This will throw if and only if checkpoint file was fine,
+         * but the inflights were not. If the checkpoint was bad, the backing
+         * store factory would have thrown.
+         */
+        doReplay(queue, dataFiles, encryptionKeyProvider);
+      } catch (BadCheckpointException ex) {
+        LOGGER.warn("Checkpoint may not have completed successfully. "
+                + "Forcing full replay, this may take a while.", ex);
+        if(!Serialization.deleteAllFiles(checkpointDir)) {
+          throw new IOException("Could not delete files in checkpoint " +
+              "directory to recover from a corrupt or incomplete checkpoint");
         }
+        backingStore = EventQueueBackingStoreFactory.get(checkpointFile,
+                queueCapacity, channelNameDescriptor);
+        queue = new FlumeEventQueue(backingStore, inflightTakesFile,
+                inflightPutsFile);
+        doReplay(queue, dataFiles, encryptionKeyProvider);
       }
 
 
@@ -388,6 +395,25 @@ class Log {
     }
   }
 
+  private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
+          KeyProvider encryptionKeyProvider) throws Exception {
+    CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
+            queue);
+    if (useFastReplay && rebuilder.rebuild()) {
+      LOGGER.info("Fast replay successful.");
+    } else {
+      ReplayHandler replayHandler = new ReplayHandler(queue,
+              encryptionKeyProvider);
+      if (useLogReplayV1) {
+        LOGGER.info("Replaying logs with v1 replay logic");
+        replayHandler.replayLogv1(dataFiles);
+      } else {
+        LOGGER.info("Replaying logs with v2 replay logic");
+        replayHandler.replayLog(dataFiles);
+      }
+    }
+  }
+
   int getNextFileID() {
     Preconditions.checkState(open, "Log is closed");
     return nextFileID.get();

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index ef8cf72..7094d3c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -18,6 +18,10 @@
  */
 package org.apache.flume.channel.file;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 
 class Serialization {
@@ -34,6 +38,8 @@ class Serialization {
   static final String METADATA_TMP_FILENAME = ".tmp";
   static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
 
+  public static final Logger LOG = LoggerFactory.getLogger(Serialization.class);
+
   static File getMetaDataTempFile(File metaDataFile) {
     String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME;
     return new File(metaDataFile.getParentFile(), metaDataFileName);
@@ -50,4 +56,30 @@ class Serialization {
     String oldMetaDataFileName = file.getName() + OLD_METADATA_FILENAME;
     return new File(file.getParentFile(), oldMetaDataFileName);
   }
+
+  /**
+   * Deletes all files in given directory.
+   * @param checkpointDir - The directory whose files are to be deleted
+   * @return - true if all files were successfully deleted, false otherwise.
+   */
+  static boolean deleteAllFiles(File checkpointDir) {
+    if (!checkpointDir.isDirectory()) {
+      return false;
+    }
+    StringBuilder builder = new StringBuilder("Deleted the following files from"
+        + " the checkpoint directory: ");
+    File[] files = checkpointDir.listFiles();
+    for (File file : files) {
+      if (!FileUtils.deleteQuietly(file)) {
+        LOG.info(builder.toString());
+        LOG.error("Error while attempting to delete: " +
+            file.getName());
+        return false;
+      }
+      builder.append(", ").append(file.getName());
+    }
+    builder.append(".");
+    LOG.info(builder.toString());
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index b1a55be..dfb3bf9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -35,6 +35,11 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.FileOutputStream;
+import java.io.RandomAccessFile;
+import java.util.Random;
+import org.apache.flume.channel.file.proto.ProtosFactory;
 
 public class TestEventQueueBackingStoreFactory {
   static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[] {
@@ -81,38 +86,26 @@ public class TestEventQueueBackingStoreFactory {
     verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
         Serialization.VERSION_2, pointersInTestCheckpoint);
   }
-  @Test
+  @Test (expected = BadCheckpointException.class)
   public void testDecreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
     EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-        get(checkpoint, 10, "test");
+            get(checkpoint, 10, "test");
     backingStore.close();
-    try {
-      EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      String expected = "Configured capacity is 9 but the  checkpoint file " +
-            "capacity is 10. See FileChannel documentation on how to change " +
-            "a channels capacity.";
-      Assert.assertEquals(expected, e.getMessage());
-    }
+    EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
+    Assert.fail();
   }
-  @Test
+
+  @Test (expected = BadCheckpointException.class)
   public void testIncreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
     EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-        get(checkpoint, 10, "test");
+            get(checkpoint, 10, "test");
     backingStore.close();
-    try {
-      EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      String expected = "Configured capacity is 11 but the  checkpoint file " +
-      		"capacity is 10. See FileChannel documentation on how to change " +
-      		"a channels capacity.";
-      Assert.assertEquals(expected, e.getMessage());
-    }
+    EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
+    Assert.fail();
   }
+
   @Test
   public void testNewCheckpoint() throws Exception {
     Assert.assertTrue(checkpoint.delete());
@@ -120,6 +113,164 @@ public class TestEventQueueBackingStoreFactory {
         Serialization.VERSION_3, Collections.<Long>emptyList());
   }
 
+  @Test (expected = BadCheckpointException.class)
+  public void testCheckpointBadVersion() throws Exception {
+     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    try {
+    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+            get(checkpoint, 10, "test");
+    backingStore.close();
+    writer.seek(
+            EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
+    writer.writeLong(94L);
+    writer.getFD().sync();
+
+    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testIncompleteCheckpoint() throws Exception {
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+
+    try {
+    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+            get(checkpoint, 10, "test");
+    backingStore.close();
+    writer.seek(
+            EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER *
+            Serialization.SIZE_OF_LONG);
+    writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
+    writer.getFD().sync();
+    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testCheckpointVersionNotEqualToMeta() throws Exception {
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    try {
+      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+              get(checkpoint, 10, "test");
+      backingStore.close();
+      writer.seek(
+              EventQueueBackingStoreFile.INDEX_VERSION
+              * Serialization.SIZE_OF_LONG);
+      writer.writeLong(2L);
+      writer.getFD().sync();
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testCheckpointVersionNotEqualToMeta2() throws Exception {
+    FileOutputStream os = null;
+    try {
+      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+              get(checkpoint, 10, "test");
+      backingStore.close();
+      Assert.assertTrue(checkpoint.exists());
+      Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
+      FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
+      ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is);
+      Assert.assertNotNull(meta);
+      is.close();
+      os = new FileOutputStream(
+              Serialization.getMetaDataFile(checkpoint));
+      meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
+      os.flush();
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      os.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testCheckpointOrderIdNotEqualToMeta() throws Exception {
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    try {
+      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+              get(checkpoint, 10, "test");
+      backingStore.close();
+      writer.seek(
+              EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID
+              * Serialization.SIZE_OF_LONG);
+      writer.writeLong(2L);
+      writer.getFD().sync();
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testCheckpointOrderIdNotEqualToMeta2() throws Exception {
+    FileOutputStream os = null;
+    try {
+      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+              get(checkpoint, 10, "test");
+      backingStore.close();
+      Assert.assertTrue(checkpoint.exists());
+      Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
+      FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
+      ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is);
+      Assert.assertNotNull(meta);
+      is.close();
+      os = new FileOutputStream(
+              Serialization.getMetaDataFile(checkpoint));
+      meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os);
+      os.flush();
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } finally {
+      os.close();
+    }
+  }
+
+
+  @Test(expected = BadCheckpointException.class)
+  public void testTruncateMeta() throws Exception {
+    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+            get(checkpoint, 10, "test");
+    backingStore.close();
+    Assert.assertTrue(checkpoint.exists());
+    File metaFile = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(metaFile.length() != 0);
+    RandomAccessFile writer = new RandomAccessFile(metaFile, "rw");
+    writer.setLength(0);
+    writer.getFD().sync();
+    writer.close();
+    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+  }
+
+  @Test (expected = InvalidProtocolBufferException.class)
+  public void testCorruptMeta() throws Throwable {
+    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
+            get(checkpoint, 10, "test");
+    backingStore.close();
+    Assert.assertTrue(checkpoint.exists());
+    File metaFile = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(metaFile.length() != 0);
+    RandomAccessFile writer = new RandomAccessFile(metaFile, "rw");
+    writer.seek(10);
+    writer.writeLong(new Random().nextLong());
+    writer.getFD().sync();
+    writer.close();
+    try {
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    } catch (BadCheckpointException ex) {
+      throw ex.getCause();
+    }
+  }
+
+
+
+
   private void verify(EventQueueBackingStore backingStore, long expectedVersion,
       List<Long> expectedPointers)
       throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 3f90805..f548f31 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -32,6 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.RandomAccessFile;
+import java.util.Random;
+import org.apache.flume.channel.file.proto.ProtosFactory;
 
 public class TestFileChannelRestart extends TestFileChannelBase {
   protected static final Logger LOG = LoggerFactory
@@ -115,13 +120,14 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     compareInputAndOut(in, out);
   }
   @Test
-  public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot()
+  public void testRestartWhenMetaDataExistsButCheckpointDoesNot()
       throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
@@ -130,16 +136,21 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Assert.assertTrue(checkpointMetaData.exists());
     channel = createFileChannel(overrides);
     channel.start();
-    Assert.assertFalse(channel.isOpen());
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(checkpoint.exists());
+    Assert.assertTrue(checkpointMetaData.exists());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
   }
   @Test
-  public void testRestartFailsWhenCheckpointExistsButMetaDoesNot()
+  public void testRestartWhenCheckpointExistsButMetaDoesNot()
       throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
@@ -148,8 +159,213 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Assert.assertTrue(checkpoint.exists());
     channel = createFileChannel(overrides);
     channel.start();
-    Assert.assertFalse(channel.isOpen());
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(checkpoint.exists());
+    Assert.assertTrue(checkpointMetaData.exists());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testRestartWhenNoCheckpointExists() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(checkpointMetaData.delete());
+    Assert.assertTrue(checkpoint.delete());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertTrue(checkpoint.exists());
+    Assert.assertTrue(checkpointMetaData.exists());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
   }
+
+  @Test
+  public void testBadCheckpointVersion() throws Exception{
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    writer.seek(EventQueueBackingStoreFile.INDEX_VERSION *
+            Serialization.SIZE_OF_LONG);
+    writer.writeLong(2L);
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testBadCheckpointMetaVersion() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
+    ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is);
+    Assert.assertNotNull(meta);
+    is.close();
+    FileOutputStream os = new FileOutputStream(
+            Serialization.getMetaDataFile(checkpoint));
+    meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
+    os.flush();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
+    ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is);
+    Assert.assertNotNull(meta);
+    is.close();
+    FileOutputStream os = new FileOutputStream(
+            Serialization.getMetaDataFile(checkpoint));
+    meta.toBuilder().setWriteOrderID(12).build().writeDelimitedTo(os);
+    os.flush();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testIncompleteCheckpoint() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER
+            * Serialization.SIZE_OF_LONG);
+    writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testCorruptInflightPuts() throws Exception {
+    testCorruptInflights("inflightPuts");
+  }
+
+  @Test
+  public void testCorruptInflightTakes() throws Exception {
+    testCorruptInflights("inflightTakes");
+  }
+
+  private void testCorruptInflights(String name) throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File inflight = new File(checkpointDir, name);
+    RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
+    writer.write(new Random().nextInt());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testTruncatedCheckpointMeta() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(
+            Serialization.getMetaDataFile(checkpoint), "rw");
+    writer.setLength(0);
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+  @Test
+  public void testCorruptCheckpointMeta() throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = putEvents(channel, "restart", 10, 100);
+    Assert.assertEquals(100, in.size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    RandomAccessFile writer = new RandomAccessFile(
+            Serialization.getMetaDataFile(checkpoint), "rw");
+    writer.seek(10);
+    writer.writeLong(new Random().nextLong());
+    writer.getFD().sync();
+    writer.close();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
+
+
   @Test
   public void testWithExtraLogs()
       throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index 0173390..203cbf2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -38,6 +38,7 @@ import org.junit.runners.Parameterized.Parameters;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
+import java.io.RandomAccessFile;
 
 @RunWith(value = Parameterized.class)
 public class TestFlumeEventQueue {
@@ -74,11 +75,11 @@ public class TestFlumeEventQueue {
   }
 
   @Parameters
-  public static Collection<Object[]> data() throws IOException {
+  public static Collection<Object[]> data() throws Exception {
     Object[][] data = new Object[][] { {
       new EventQueueBackingStoreSupplier() {
         @Override
-        public EventQueueBackingStore get() throws IOException {
+        public EventQueueBackingStore get() throws Exception {
           Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
           return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
               "test");
@@ -87,7 +88,7 @@ public class TestFlumeEventQueue {
     }, {
       new EventQueueBackingStoreSupplier() {
         @Override
-        public EventQueueBackingStore get() throws IOException {
+        public EventQueueBackingStore get() throws Exception {
           Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
           return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000,
               "test");
@@ -345,5 +346,70 @@ public class TestFlumeEventQueue {
             txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
 
   }
-}
 
+  @Test(expected = BadCheckpointException.class)
+  public void testCorruptInflightPuts() throws Exception {
+    RandomAccessFile inflight = null;
+    try {
+      queue = new FlumeEventQueue(backingStore,
+              backingStoreSupplier.getInflightTakes(),
+              backingStoreSupplier.getInflightPuts());
+      long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
+      long txnID2 = txnID1 + 1;
+      queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
+      queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1);
+      queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2);
+      queue.checkpoint(true);
+      TimeUnit.SECONDS.sleep(3L);
+      inflight = new RandomAccessFile(
+              backingStoreSupplier.getInflightPuts(), "rw");
+      inflight.seek(0);
+      inflight.writeInt(new Random().nextInt());
+      queue = new FlumeEventQueue(backingStore,
+              backingStoreSupplier.getInflightTakes(),
+              backingStoreSupplier.getInflightPuts());
+      SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
+      Assert.assertTrue(deserializedMap.get(
+              txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+      Assert.assertTrue(deserializedMap.get(
+              txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
+      Assert.assertTrue(deserializedMap.get(
+              txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+    } finally {
+      inflight.close();
+    }
+  }
+
+  @Test(expected = BadCheckpointException.class)
+  public void testCorruptInflightTakes() throws Exception {
+    RandomAccessFile inflight = null;
+    try {
+      queue = new FlumeEventQueue(backingStore,
+              backingStoreSupplier.getInflightTakes(),
+              backingStoreSupplier.getInflightPuts());
+      long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
+      long txnID2 = txnID1 + 1;
+      queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
+      queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1);
+      queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2);
+      queue.checkpoint(true);
+      TimeUnit.SECONDS.sleep(3L);
+      inflight = new RandomAccessFile(
+              backingStoreSupplier.getInflightTakes(), "rw");
+      inflight.seek(0);
+      inflight.writeInt(new Random().nextInt());
+      queue = new FlumeEventQueue(backingStore,
+              backingStoreSupplier.getInflightTakes(),
+              backingStoreSupplier.getInflightPuts());
+      SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
+      Assert.assertTrue(deserializedMap.get(
+              txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+      Assert.assertTrue(deserializedMap.get(
+              txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
+      Assert.assertTrue(deserializedMap.get(
+              txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+    } finally {
+      inflight.close();
+    }
+  }
+}


Mime
View raw message