flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [3/50] [abbrv] FLUME-1487. FileChannel format needs to be extensible.
Date Fri, 07 Sep 2012 23:28:52 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 2867fc7..c4e0424 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -36,14 +36,12 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-/**
- * Represents a single data file on disk. Has methods to write,
- * read sequentially (replay), and read randomly (channel takes).
- */
-class LogFile {
+abstract class LogFile {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(LogFile.class);
+
+
   /**
    * This class preallocates the data files 1MB at time to avoid
    * the updating of the inode on each write and to avoid the disk
@@ -52,117 +50,106 @@ class LogFile {
   private static final ByteBuffer FILL = DirectMemoryUtils.
       allocate(1024 * 1024); // preallocation, 1MB
 
-  private static final byte OP_RECORD = Byte.MAX_VALUE;
-  private static final byte OP_EOF = Byte.MIN_VALUE;
+  protected static final byte OP_RECORD = Byte.MAX_VALUE;
+  protected static final byte OP_EOF = Byte.MIN_VALUE;
 
   static {
     for (int i = 0; i < FILL.capacity(); i++) {
       FILL.put(OP_EOF);
     }
   }
-  private static final int VERSION = 2;
 
+  abstract static class MetaDataWriter {
+    private final File file;
+    private final int logFileID;
+    private final RandomAccessFile writeFileHandle;
+
+    private long lastCheckpointOffset;
+    private long lastCheckpointWriteOrderID;
+
+    protected MetaDataWriter(File file, int logFileID) throws IOException {
+      this.file = file;
+      this.logFileID = logFileID;
+      writeFileHandle = new RandomAccessFile(file, "rw");
+
+    }
+    protected RandomAccessFile getFileHandle() {
+      return writeFileHandle;
+    }
+    protected void setLastCheckpointOffset(long lastCheckpointOffset) {
+      this.lastCheckpointOffset = lastCheckpointOffset;
+    }
+    protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
+      this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
+    }
+    protected long getLastCheckpointOffset() {
+      return lastCheckpointOffset;
+    }
+    protected long getLastCheckpointWriteOrderID() {
+      return lastCheckpointWriteOrderID;
+    }
+    protected File getFile() {
+      return file;
+    }
+    protected int getLogFileID() {
+      return logFileID;
+    }
+    void markCheckpoint(long logWriteOrderID)
+        throws IOException {
+      markCheckpoint(lastCheckpointOffset, logWriteOrderID);
+    }
+    abstract void markCheckpoint(long currentPosition, long logWriteOrderID)
+        throws IOException;
+
+    abstract int getVersion();
+
+    void close() {
+      try {
+        writeFileHandle.close();
+      } catch (IOException e) {
+        LOG.warn("Unable to close " + file, e);
+      }
+    }
+  }
 
-  static class Writer {
-    private final int fileID;
+  static abstract class Writer {
+    private final int logFileID;
     private final File file;
     private final long maxFileSize;
     private final RandomAccessFile writeFileHandle;
     private final FileChannel writeFileChannel;
-    private final long checkpointPositionMarker;
-
     private volatile boolean open;
 
     Writer(File file, int logFileID, long maxFileSize)
         throws IOException {
-      this(file, logFileID, maxFileSize, true);
-    }
-
-    Writer(File file, int logFileID, long maxFileSize, boolean active)
-        throws IOException {
       this.file = file;
-      fileID = logFileID;
+      this.logFileID = logFileID;
       this.maxFileSize = Math.min(maxFileSize,
           FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
       writeFileHandle = new RandomAccessFile(file, "rw");
-      if (active) {
-        writeFileHandle.writeInt(VERSION);
-        writeFileHandle.writeInt(fileID);
-        checkpointPositionMarker = writeFileHandle.getFilePointer();
-        // checkpoint marker
-        writeFileHandle.writeLong(0L);
-        // timestamp placeholder
-        writeFileHandle.writeLong(0L);
-        writeFileChannel = writeFileHandle.getChannel();
-        writeFileChannel.force(true);
-      } else {
-        int version = writeFileHandle.readInt();
-        if (version != VERSION) {
-          throw new IOException("The version of log file: "
-              + file.getCanonicalPath() + " is different from expected "
-              + " version: expected = " + VERSION + ", found = " + version);
-        }
-        int fid = writeFileHandle.readInt();
-        if (fid != logFileID) {
-          throw new IOException("The file id of log file: "
-              + file.getCanonicalPath() + " is different from expected "
-              + " id: expected = " + logFileID + ", found = " + fid);
-        }
-        checkpointPositionMarker = writeFileHandle.getFilePointer();
-        long chkptMarker = writeFileHandle.readLong();
-        long chkptTimestamp = writeFileHandle.readLong();
-        LOG.info("File: " + file.getCanonicalPath() + " was last checkpointed "
-             + "at position: " + chkptMarker + ", ts: " + chkptTimestamp);
-        writeFileChannel = writeFileHandle.getChannel();
-
-        // Jump to the last position
-        writeFileChannel.position(chkptMarker);
-      }
+      writeFileChannel = writeFileHandle.getChannel();
       LOG.info("Opened " + file);
       open = true;
     }
 
-    File getFile() {
-      return file;
-    }
+    abstract int getVersion();
 
-    synchronized void markCheckpoint(long logWriteOrderID) throws IOException {
-      long currentPosition = writeFileChannel.position();
-      writeFileHandle.seek(checkpointPositionMarker);
-      writeFileHandle.writeLong(currentPosition);
-      writeFileHandle.writeLong(logWriteOrderID);
-      writeFileChannel.position(currentPosition);
-      LOG.info("Noted checkpoint for file: " + file + ", id: " + fileID
-          + ", checkpoint position: " + currentPosition);
+    int getLogFileID() {
+      return logFileID;
     }
 
+    File getFile() {
+      return file;
+    }
     String getParent() {
       return file.getParent();
     }
-
-    synchronized void close() {
-      if(open) {
-        open = false;
-        if(writeFileChannel.isOpen()) {
-          LOG.info("Closing " + file);
-          try {
-            writeFileChannel.force(true);
-          } catch (IOException e) {
-            LOG.warn("Unable to flush to disk", e);
-          }
-          try {
-            writeFileHandle.close();
-          } catch (IOException e) {
-            LOG.info("Unable to close", e);
-          }
-        }
-      }
+    long getMaxSize() {
+      return maxFileSize;
     }
-
-    synchronized long length() throws IOException {
-      return writeFileChannel.position();
+    synchronized long position() throws IOException {
+      return getFileChannel().position();
     }
-
     synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
       Pair<Integer, Integer> pair = write(buffer);
       return new FlumeEventPointer(pair.getLeft(), pair.getRight());
@@ -177,49 +164,72 @@ class LogFile {
       write(buffer);
       sync();
     }
-
-    synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException {
-      return open && length() + (long) buffer.capacity() > maxFileSize;
-    }
-
-    int getFileID() {
-      return fileID;
-    }
-    private void sync() throws IOException {
-      Preconditions.checkState(open, "File closed");
-      writeFileChannel.force(false);
-    }
     private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
-      Preconditions.checkState(open, "File closed");
-      long length = length();
+      Preconditions.checkState(isOpen(), "File closed");
+      long length = position();
       long expectedLength = length + (long) buffer.capacity();
       Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
       int offset = (int)length;
-      Preconditions.checkState(offset > 0);
+      Preconditions.checkState(offset >= 0, String.valueOf(offset));
       int recordLength = 1 + buffer.capacity();
       preallocate(recordLength);
       ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
       toWrite.put(OP_RECORD);
       toWrite.put(buffer);
       toWrite.position(0);
-      int wrote = writeFileChannel.write(toWrite);
+      int wrote = getFileChannel().write(toWrite);
       Preconditions.checkState(wrote == toWrite.limit());
-      return Pair.of(fileID, offset);
+      return Pair.of(getLogFileID(), offset);
+    }
+    synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException {
+      return isOpen() && position() + (long) buffer.capacity() > getMaxSize();
+    }
+    private void sync() throws IOException {
+      Preconditions.checkState(isOpen(), "File closed");
+      getFileChannel().force(false);
+    }
+
+
+    protected boolean isOpen() {
+      return open;
+    }
+    protected RandomAccessFile getFileHandle() {
+      return writeFileHandle;
+    }
+    protected FileChannel getFileChannel() {
+      return writeFileChannel;
+    }
+    synchronized void close() {
+      if(open) {
+        open = false;
+        if(writeFileChannel.isOpen()) {
+          LOG.info("Closing " + file);
+          try {
+            writeFileChannel.force(true);
+          } catch (IOException e) {
+            LOG.warn("Unable to flush to disk " + file, e);
+          }
+          try {
+            writeFileHandle.close();
+          } catch (IOException e) {
+            LOG.warn("Unable to close " + file, e);
+          }
+        }
+      }
     }
-    private void preallocate(int size) throws IOException {
-      long position = writeFileChannel.position();
-      if(position + size > writeFileChannel.size()) {
+    protected void preallocate(int size) throws IOException {
+      long position = position();
+      if(position + size > getFileChannel().size()) {
         LOG.debug("Preallocating at position " + position);
         synchronized (FILL) {
           FILL.position(0);
-          writeFileChannel.write(FILL, position);
+          getFileChannel().write(FILL, position);
         }
       }
     }
-
   }
 
-  static class RandomReader {
+  static abstract class RandomReader {
     private final File file;
     private final BlockingQueue<RandomAccessFile> readFileHandles =
         new ArrayBlockingQueue<RandomAccessFile>(50, true);
@@ -231,6 +241,11 @@ class LogFile {
       open = true;
     }
 
+    protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle)
+        throws IOException;
+
+    abstract int getVersion();
+
     File getFile() {
       return file;
     }
@@ -242,9 +257,9 @@ class LogFile {
       try {
         fileHandle.seek(offset);
         byte operation = fileHandle.readByte();
-        Preconditions.checkState(operation == OP_RECORD, Integer.toHexString(operation));
-        TransactionEventRecord record = TransactionEventRecord.
-            fromDataInput(fileHandle);
+        Preconditions.checkState(operation == OP_RECORD,
+            Integer.toHexString(operation));
+        TransactionEventRecord record = doGet(fileHandle);
         if(!(record instanceof Put)) {
           Preconditions.checkState(false, "Record is " +
               record.getClass().getSimpleName());
@@ -253,12 +268,13 @@ class LogFile {
         return ((Put)record).getEvent();
       } finally {
         if(error) {
-          close(fileHandle);
+          close(fileHandle, file);
         } else {
           checkIn(fileHandle);
         }
       }
     }
+
     synchronized void close() {
       if(open) {
         open = false;
@@ -270,28 +286,30 @@ class LogFile {
               try {
                 fileHandle.close();
               } catch (IOException e) {
-                LOG.info("Unable to close fileHandle for " + file);
+                LOG.warn("Unable to close fileHandle for " + file, e);
               }
             }
           }
           fileHandles.clear();
           try {
-            Thread.sleep(100L);
+            Thread.sleep(5L);
           } catch (InterruptedException e) {
             // this is uninterruptable
           }
         }
       }
     }
+
     private RandomAccessFile open() throws IOException {
       return new RandomAccessFile(file, "r");
     }
 
     private void checkIn(RandomAccessFile fileHandle) {
       if(!readFileHandles.offer(fileHandle)) {
-        close(fileHandle);
+        close(fileHandle, file);
       }
     }
+
     private RandomAccessFile checkOut()
         throws IOException, InterruptedException {
       RandomAccessFile fileHandle = readFileHandles.poll();
@@ -306,24 +324,27 @@ class LogFile {
       }
       return readFileHandles.take();
     }
-    private static void close(RandomAccessFile fileHandle) {
+    private static void close(RandomAccessFile fileHandle, File file) {
       if(fileHandle != null) {
         try {
           fileHandle.close();
-        } catch (IOException e) {}
+        } catch (IOException e) {
+          LOG.warn("Unable to close " + file, e);
+        }
       }
     }
   }
 
-  static class SequentialReader {
+  static abstract class SequentialReader {
+
     private final RandomAccessFile fileHandle;
     private final FileChannel fileChannel;
-    private final int version;
-    private final int logFileID;
-    private final long lastCheckpointPosition;
-    private final long lastCheckpointTimestamp;
     private final File file;
 
+    private int logFileID;
+    private long lastCheckpointPosition;
+    private long lastCheckpointWriteOrderID;
+
     /**
      * Construct a Sequential Log Reader object
      * @param file
@@ -334,38 +355,44 @@ class LogFile {
       this.file = file;
       fileHandle = new RandomAccessFile(file, "r");
       fileChannel = fileHandle.getChannel();
-      version = fileHandle.readInt();
-      if(version != VERSION) {
-        throw new IOException("Version is " + Integer.toHexString(version) +
-            " expected " + Integer.toHexString(VERSION)
-            + " file: " + file.getCanonicalPath());
-      }
-      logFileID = fileHandle.readInt();
-      lastCheckpointPosition = fileHandle.readLong();
-      lastCheckpointTimestamp = fileHandle.readLong();
+    }
+    abstract LogRecord doNext(int offset) throws IOException;
 
+    abstract int getVersion();
+
+    protected void setLastCheckpointPosition(long lastCheckpointPosition) {
+      this.lastCheckpointPosition = lastCheckpointPosition;
+    }
+    protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
+      this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
+    }
+    protected void setLogFileID(int logFileID) {
+      this.logFileID = logFileID;
       Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
           + Integer.toHexString(logFileID));
+
     }
-    int getVersion() {
-      return version;
+    protected RandomAccessFile getFileHandle() {
+      return fileHandle;
     }
     int getLogFileID() {
       return logFileID;
     }
-    void skipToLastCheckpointPosition(long checkpointTimestamp)
+    void skipToLastCheckpointPosition(long checkpointWriteOrderID)
         throws IOException {
       if (lastCheckpointPosition > 0L
-          && lastCheckpointTimestamp <= checkpointTimestamp) {
+          && lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
         LOG.info("fast-forward to checkpoint position: "
                   + lastCheckpointPosition);
         fileChannel.position(lastCheckpointPosition);
       } else {
         LOG.warn("Checkpoint for file(" + file.getAbsolutePath() + ") "
-            + "is: " + lastCheckpointTimestamp + ", which is beyond the "
-            + "requested checkpoint time: " + checkpointTimestamp + ". ");
+            + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+            + "requested checkpoint time: " + checkpointWriteOrderID
+            + " and position " + lastCheckpointPosition);
       }
     }
+
     LogRecord next() throws IOException {
       int offset = -1;
       try {
@@ -376,6 +403,7 @@ class LogFile {
                 + ", position: " + position);
         }
         offset = (int) position;
+        Preconditions.checkState(offset >= 0);
         byte operation = fileHandle.readByte();
         if(operation != OP_RECORD) {
           if(operation == OP_EOF) {
@@ -386,10 +414,7 @@ class LogFile {
           }
           return null;
         }
-        TransactionEventRecord record = TransactionEventRecord.
-            fromDataInput(fileHandle);
-        Preconditions.checkState(offset > 0);
-        return new LogRecord(logFileID, offset, record);
+        return doNext(offset);
       } catch(EOFException e) {
         return null;
       } catch (IOException e) {
@@ -397,6 +422,7 @@ class LogFile {
             file.getCanonicalPath() + " at offset " + offset, e);
       }
     }
+
     void close() {
       if(fileHandle != null) {
         try {
@@ -405,12 +431,11 @@ class LogFile {
       }
     }
   }
-
   public static void main(String[] args) throws EOFException, IOException {
     File file = new File(args[0]);
     LogFile.SequentialReader reader = null;
     try {
-      reader = new LogFile.SequentialReader(file);
+      reader = LogFileFactory.getSequentialReader(file);
       LogRecord entry;
       FlumeEventPointer ptr;
       // for puts the fileId is the fileID of the file they exist in

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
new file mode 100644
index 0000000..8de37b8
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+class LogFileFactory {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LogFileFactory.class);
+  private LogFileFactory() {}
+
+  static LogFile.MetaDataWriter getMetaDataWriter(File file, int logFileID)
+      throws IOException {
+    RandomAccessFile logFile = null;
+    try {
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      if(metaDataFile.exists()) {
+        return new LogFileV3.MetaDataWriter(file, logFileID);
+      }
+      logFile = new RandomAccessFile(file, "r");
+      int version = logFile.readInt();
+      if(Serialization.VERSION_2 == version) {
+        return new LogFileV2.MetaDataWriter(file, logFileID);
+      }
+      throw new IOException("File " + file + " has bad version " +
+          Integer.toHexString(version));
+    } finally {
+      if(logFile != null) {
+        try {
+          logFile.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + file, e);
+        }
+      }
+    }
+  }
+
+  static LogFile.Writer getWriter(File file, int logFileID,
+      long maxFileSize) throws IOException {
+    if(!(file.exists() || file.createNewFile())) {
+      throw new IOException("Cannot create " + file);
+    }
+    return new LogFileV3.Writer(file, logFileID, maxFileSize);
+  }
+
+  static LogFile.RandomReader getRandomReader(File file)
+      throws IOException {
+    RandomAccessFile logFile = new RandomAccessFile(file, "r");
+    try {
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      // either this is a rr for a just created file or
+      // the metadata file exists and as such it's V3
+      if(logFile.length() == 0L || metaDataFile.exists()) {
+        return new LogFileV3.RandomReader(file);
+      }
+      int version = logFile.readInt();
+      if(Serialization.VERSION_2 == version) {
+        return new LogFileV2.RandomReader(file);
+      }
+      throw new IOException("File " + file + " has bad version " +
+          Integer.toHexString(version));
+    } finally {
+      if(logFile != null) {
+        try {
+          logFile.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + file, e);
+        }
+      }
+    }
+  }
+
+  static LogFile.SequentialReader getSequentialReader(File file)
+      throws IOException {
+    RandomAccessFile logFile = null;
+    try {
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      if(metaDataFile.exists()) {
+        return new LogFileV3.SequentialReader(file);
+      }
+      logFile = new RandomAccessFile(file, "r");
+      int version = logFile.readInt();
+      if(Serialization.VERSION_2 == version) {
+        return new LogFileV2.SequentialReader(file);
+      }
+      throw new IOException("File " + file + " has bad version " +
+          Integer.toHexString(version));
+    } finally {
+      if(logFile != null) {
+        try {
+          logFile.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + file, e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
new file mode 100644
index 0000000..c81b290
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a single data file on disk. Has methods to write,
+ * read sequentially (replay), and read randomly (channel takes).
+ */
+@Deprecated
+class LogFileV2 extends LogFile {
+  protected static final Logger LOGGER =
+      LoggerFactory.getLogger(LogFileV2.class);
+
+  private static final long OFFSET_CHECKPOINT = 2 * Serialization.SIZE_OF_INT;
+
+  private LogFileV2() {}
+
+  static class MetaDataWriter extends LogFile.MetaDataWriter {
+
+    protected MetaDataWriter(File file, int logFileID) throws IOException {
+      super(file, logFileID);
+      boolean error = true;
+      try {
+        RandomAccessFile writeFileHandle = getFileHandle();
+        int version = writeFileHandle.readInt();
+        if (version != getVersion()) {
+          throw new IOException("The version of log file: "
+              + file.getCanonicalPath() + " is different from expected "
+              + " version: expected = " + getVersion() + ", found = " + version);
+        }
+        int fid = writeFileHandle.readInt();
+        if (fid != logFileID) {
+          throw new IOException("The file id of log file: "
+              + file.getCanonicalPath() + " is different from expected "
+              + " id: expected = " + logFileID + ", found = " + fid);
+        }
+        setLastCheckpointOffset(writeFileHandle.readLong());
+        setLastCheckpointWriteOrderID(writeFileHandle.readLong());
+        LOGGER.info("File: " + file.getCanonicalPath() + " was last checkpointed "
+            + "at position: " + getLastCheckpointOffset()
+            + ", logWriteOrderID: " + getLastCheckpointWriteOrderID());
+        error = false;
+      } finally {
+        if(error) {
+          close();
+        }
+      }
+    }
+
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_2;
+    }
+
+    @Override
+    void markCheckpoint(long currentPosition, long logWriteOrderID)
+        throws IOException {
+      RandomAccessFile writeFileHandle = getFileHandle();
+      writeFileHandle.seek(OFFSET_CHECKPOINT);
+      writeFileHandle.writeLong(currentPosition);
+      writeFileHandle.writeLong(logWriteOrderID);
+      writeFileHandle.getChannel().force(true);
+      LOGGER.info("Noted checkpoint for file: " + getFile() + ", id: "
+          + getLogFileID() + ", checkpoint position: " + currentPosition
+          + ", logWriteOrderID: " + logWriteOrderID);
+
+    }
+  }
+
+  static class Writer extends LogFile.Writer {
+
+    Writer(File file, int logFileID, long maxFileSize)
+        throws IOException {
+      super(file, logFileID, maxFileSize);
+      RandomAccessFile writeFileHandle = getFileHandle();
+      writeFileHandle.writeInt(getVersion());
+      writeFileHandle.writeInt(logFileID);
+      // checkpoint marker
+      writeFileHandle.writeLong(0L);
+      // timestamp placeholder
+      writeFileHandle.writeLong(0L);
+      getFileChannel().force(true);
+
+    }
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_2;
+    }
+  }
+
+  static class RandomReader extends LogFile.RandomReader {
+    RandomReader(File file) throws IOException {
+      super(file);
+    }
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_2;
+    }
+    @Override
+    protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
+        throws IOException {
+      return TransactionEventRecord.fromDataInputV2(fileHandle);
+    }
+  }
+
+  static class SequentialReader extends LogFile.SequentialReader {
+
+    SequentialReader(File file) throws EOFException, IOException {
+      super(file);
+      RandomAccessFile fileHandle = getFileHandle();
+      int version = fileHandle.readInt();
+      if(version != getVersion()) {
+        throw new IOException("Version is " + Integer.toHexString(version) +
+            " expected " + Integer.toHexString(getVersion())
+            + " file: " + file.getCanonicalPath());
+      }
+      setLogFileID(fileHandle.readInt());
+      setLastCheckpointPosition(fileHandle.readLong());
+      setLastCheckpointWriteOrderID(fileHandle.readLong());
+    }
+    @Override
+    public int getVersion() {
+      return Serialization.VERSION_2;
+    }
+    @Override
+    LogRecord doNext(int offset) throws IOException {
+      TransactionEventRecord event =
+          TransactionEventRecord.fromDataInputV2(getFileHandle());
+      return new LogRecord(getLogFileID(), offset, event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
new file mode 100644
index 0000000..502dfbd
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a single data file on disk. Has methods to write,
+ * read sequentially (replay), and read randomly (channel takes).
+ */
+class LogFileV3 extends LogFile {
+  protected static final Logger LOGGER =
+      LoggerFactory.getLogger(LogFileV3.class);
+
+  private LogFileV3() {}
+
+  static class MetaDataWriter extends LogFile.MetaDataWriter {
+    private final ProtosFactory.LogFileMetaData logFileMetaData;
+    private final File metaDataFile;
+    protected MetaDataWriter(File logFile, int logFileID) throws IOException {
+      super(logFile, logFileID);
+      metaDataFile = Serialization.getMetaDataFile(logFile);
+      MetaDataReader metaDataReader = new MetaDataReader(logFile, logFileID);
+      logFileMetaData = metaDataReader.read();
+      int version = logFileMetaData.getVersion();
+      if(version != getVersion()) {
+        throw new IOException("Version is " + Integer.toHexString(version) +
+            " expected " + Integer.toHexString(getVersion())
+            + " file: " + logFile);
+      }
+      setLastCheckpointOffset(logFileMetaData.getCheckpointPosition());
+      setLastCheckpointWriteOrderID(logFileMetaData.getCheckpointWriteOrderID());
+    }
+
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_3;
+    }
+
+    @Override
+    void markCheckpoint(long currentPosition, long logWriteOrderID)
+        throws IOException {
+      ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
+          ProtosFactory.LogFileMetaData.newBuilder(logFileMetaData);
+      metaDataBuilder.setCheckpointPosition(currentPosition);
+      metaDataBuilder.setCheckpointWriteOrderID(logWriteOrderID);
+      LOGGER.info("Updating " + metaDataFile.getName()  + " currentPosition = "
+          + currentPosition + ", logWriteOrderID = " + logWriteOrderID);
+      FileOutputStream outputStream = new FileOutputStream(metaDataFile);
+      try {
+        metaDataBuilder.build().writeDelimitedTo(outputStream);
+        outputStream.getChannel().force(true);
+      } finally {
+        try {
+          outputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    }
+  }
+
+  static class MetaDataReader {
+    private final File logFile;
+    private final File metaDataFile;
+    private final int logFileID;
+    protected MetaDataReader(File logFile, int logFileID) throws IOException {
+      this.logFile = logFile;
+      metaDataFile = Serialization.getMetaDataFile(logFile);
+      this.logFileID = logFileID;
+    }
+    ProtosFactory.LogFileMetaData read() throws IOException {
+      FileInputStream inputStream = new FileInputStream(metaDataFile);
+      try {
+        ProtosFactory.LogFileMetaData metaData =
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+        if (metaData.getLogFileID() != logFileID) {
+          throw new IOException("The file id of log file: "
+              + logFile + " is different from expected "
+              + " id: expected = " + logFileID + ", found = "
+              + metaData.getLogFileID());
+        }
+        return metaData;
+      } finally {
+        try {
+          inputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    }
+  }
+
+  static class Writer extends LogFile.Writer {
+    Writer(File file, int logFileID, long maxFileSize)
+        throws IOException {
+      super(file, logFileID, maxFileSize);
+      ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
+          ProtosFactory.LogFileMetaData.newBuilder();
+      metaDataBuilder.setVersion(getVersion());
+      metaDataBuilder.setLogFileID(logFileID);
+      metaDataBuilder.setCheckpointPosition(0L);
+      metaDataBuilder.setCheckpointWriteOrderID(0L);
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      FileOutputStream outputStream = new FileOutputStream(metaDataFile);
+      try {
+        metaDataBuilder.build().writeDelimitedTo(outputStream);
+        outputStream.getChannel().force(true);
+      } finally {
+        try {
+          outputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    }
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_3;
+    }
+  }
+
+  static class RandomReader extends LogFile.RandomReader {
+    private volatile boolean initialized;
+    RandomReader(File file) throws IOException {
+      super(file);
+    }
+    private void initialize() throws IOException {
+      File metaDataFile = Serialization.getMetaDataFile(getFile());
+      FileInputStream inputStream = new FileInputStream(metaDataFile);
+      try {
+        ProtosFactory.LogFileMetaData metaData =
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+        int version = metaData.getVersion();
+        if(version != getVersion()) {
+          throw new IOException("Version is " + Integer.toHexString(version) +
+              " expected " + Integer.toHexString(getVersion())
+              + " file: " + getFile().getCanonicalPath());
+        }
+      } finally {
+        try {
+          inputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    }
+    @Override
+    int getVersion() {
+      return Serialization.VERSION_3;
+    }
+    @Override
+    protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
+        throws IOException {
+      // readers are opened right when the file is created and thus
+      // empty. As such we wait to initialize until there is some
+      // data before we we initialize
+      if(!initialized) {
+        initialized = true;
+        initialize();
+      }
+      return TransactionEventRecord.
+          fromInputStream(Channels.newInputStream(fileHandle.getChannel()));
+    }
+  }
+
+  static class SequentialReader extends LogFile.SequentialReader {
+
+    private InputStream inputStream;
+    SequentialReader(File file) throws EOFException, IOException {
+      super(file);
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      FileInputStream inputStream = new FileInputStream(metaDataFile);
+      try {
+        ProtosFactory.LogFileMetaData metaData =
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream);
+        int version = metaData.getVersion();
+        if(version != getVersion()) {
+          throw new IOException("Version is " + Integer.toHexString(version) +
+              " expected " + Integer.toHexString(getVersion())
+              + " file: " + file.getCanonicalPath());
+        }
+        setLogFileID(metaData.getLogFileID());
+        setLastCheckpointPosition(metaData.getCheckpointPosition());
+        setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
+      } finally {
+        try {
+          inputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+      this.inputStream = Channels.newInputStream(getFileHandle().getChannel());
+    }
+
+    @Override
+    public int getVersion() {
+      return Serialization.VERSION_3;
+    }
+    @Override
+    LogRecord doNext(int offset) throws IOException {
+      TransactionEventRecord event =
+          TransactionEventRecord.fromInputStream(inputStream);
+      return new LogRecord(getLogFileID(), offset, event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
index 09956a4..c14a6f0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
@@ -61,7 +61,8 @@ public class LogUtils {
     List<File> result = Lists.newArrayList();
     for (File file : logDir.listFiles()) {
       String name = file.getName();
-      if (name.startsWith(Log.PREFIX)) {
+      if (name.startsWith(Log.PREFIX) &&
+          !name.endsWith(Serialization.METADATA_FILENAME)) {
         result.add(file);
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index c3fda6b..884ebde 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -21,6 +21,14 @@ package org.apache.flume.channel.file;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
 
 /**
  * Represents a Put on disk
@@ -28,12 +36,12 @@ import java.io.IOException;
 class Put extends TransactionEventRecord {
   private FlumeEvent event;
 
-  Put(Long transactionID) {
-    super(transactionID);
+  Put(Long transactionID, Long logWriteOrderID) {
+    super(transactionID, logWriteOrderID);
   }
 
-  Put(Long transactionID, FlumeEvent event) {
-    this(transactionID);
+  Put(Long transactionID, Long logWriteOrderID, FlumeEvent event) {
+    this(transactionID, logWriteOrderID);
     this.event = event;
   }
 
@@ -53,6 +61,37 @@ class Put extends TransactionEventRecord {
     event.write(out);
   }
   @Override
+  void writeProtos(OutputStream out) throws IOException {
+    ProtosFactory.Put.Builder putBuilder = ProtosFactory.Put.newBuilder();
+    ProtosFactory.FlumeEvent.Builder eventBuilder =
+        ProtosFactory.FlumeEvent.newBuilder();
+    Map<String, String> headers = event.getHeaders();
+    ProtosFactory.FlumeEventHeader.Builder headerBuilder =
+        ProtosFactory.FlumeEventHeader.newBuilder();
+    if(headers != null) {
+      for(String key : headers.keySet()) {
+        String value = headers.get(key);
+        headerBuilder.clear();
+        eventBuilder.addHeaders(headerBuilder.setKey(key)
+            .setValue(value).build());
+      }
+    }
+    eventBuilder.setBody(ByteString.copyFrom(event.getBody()));
+    putBuilder.setEvent(eventBuilder.build());
+    putBuilder.build().writeDelimitedTo(out);
+  }
+  @Override
+  void readProtos(InputStream in) throws IOException {
+    ProtosFactory.Put put = ProtosFactory.Put.parseDelimitedFrom(in);
+    Map<String, String> headers = Maps.newHashMap();
+    ProtosFactory.FlumeEvent protosEvent = put.getEvent();
+    for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
+      headers.put(header.getKey(), header.getValue());
+    }
+    // TODO when we remove v2, remove FlumeEvent and use EventBuilder here
+    event = new FlumeEvent(headers, protosEvent.getBody().toByteArray());
+  }
+  @Override
   public short getRecordType() {
     return Type.PUT.get();
   }
@@ -69,4 +108,5 @@ class Put extends TransactionEventRecord {
     builder.append("]");
     return builder.toString();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index 4e908ec..17550ec 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -67,15 +67,12 @@ class ReplayHandler {
   private final List<Long> pendingTakes;
   private final boolean useFastReplay;
   private final File cpDir;
-  private final long maxFileSize;
 
-  ReplayHandler(FlumeEventQueue queue, boolean useFastReplay, File cpDir,
-          long maxFileSize) {
+  ReplayHandler(FlumeEventQueue queue, boolean useFastReplay, File cpDir) {
     this.queue = queue;
     this.useFastReplay = useFastReplay;
     this.lastCheckpoint = queue.getLogWriteOrderID();
     this.cpDir = cpDir;
-    this.maxFileSize = maxFileSize;
     pendingTakes = Lists.newArrayList();
     readers = Maps.newHashMap();
     logRecordBuffer = new PriorityQueue<LogRecord>();
@@ -88,7 +85,7 @@ class ReplayHandler {
   void replayLogv1(List<File> logs) throws Exception {
     if(useFastReplay) {
       CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
-              maxFileSize, queue);
+              queue);
       if(rebuilder.rebuild()){
         LOG.info("Fast replay successful.");
         return;
@@ -112,7 +109,7 @@ class ReplayHandler {
       LOG.info("Replaying " + log);
       LogFile.SequentialReader reader = null;
       try {
-        reader = new LogFile.SequentialReader(log);
+        reader = LogFileFactory.getSequentialReader(log);
         reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
         LogRecord entry;
         FlumeEventPointer ptr;
@@ -227,7 +224,7 @@ class ReplayHandler {
   void replayLog(List<File> logs) throws Exception {
     if (useFastReplay) {
       CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
-              maxFileSize, queue);
+              queue);
       if (rebuilder.rebuild()) {
         LOG.info("Fast replay successful.");
         return;
@@ -252,7 +249,8 @@ class ReplayHandler {
       for (File log : logs) {
         LOG.info("Replaying " + log);
         try {
-          LogFile.SequentialReader reader = new LogFile.SequentialReader(log);
+          LogFile.SequentialReader reader =
+              LogFileFactory.getSequentialReader(log);
           reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
           Preconditions.checkState(!readers.containsKey(reader.getLogFileID()),
               "Readers " + readers + " already contains "

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
index 6e4e1fc..1a8ddff 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
@@ -21,13 +21,17 @@ package org.apache.flume.channel.file;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
 
 /**
  * Represents a Rollback on disk
  */
 class Rollback extends TransactionEventRecord {
-  Rollback(Long transactionID) {
-    super(transactionID);
+  Rollback(Long transactionID, Long logWriteOrderID) {
+    super(transactionID, logWriteOrderID);
   }
   @Override
   public void readFields(DataInput in) throws IOException {
@@ -39,6 +43,18 @@ class Rollback extends TransactionEventRecord {
     super.write(out);
   }
   @Override
+  void writeProtos(OutputStream out) throws IOException {
+    ProtosFactory.Rollback.Builder rollbackBuilder =
+        ProtosFactory.Rollback.newBuilder();
+    rollbackBuilder.build().writeDelimitedTo(out);
+  }
+  @Override
+  void readProtos(InputStream in) throws IOException {
+    @SuppressWarnings("unused")
+    ProtosFactory.Rollback rollback =
+      ProtosFactory.Rollback.parseDelimitedFrom(in);
+  }
+  @Override
   short getRecordType() {
     return Type.ROLLBACK.get();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
new file mode 100644
index 0000000..ad2a645
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+
+class Serialization {
+  private Serialization() {}
+
+  static final long SIZE_OF_INT = 4;
+  static final int SIZE_OF_LONG = 8;
+
+
+  static final int VERSION_2 = 2;
+  static final int VERSION_3 = 3;
+
+  static final String METADATA_FILENAME = ".meta";
+
+
+  static File getMetaDataFile(File file) {
+    String metaDataFileName = file.getName() + METADATA_FILENAME;
+    return new File(file.getParentFile(), metaDataFileName);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
index cb575f9..cfbe0c9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
@@ -21,6 +21,10 @@ package org.apache.flume.channel.file;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
 
 /**
  * Represents a Take on disk
@@ -28,11 +32,11 @@ import java.io.IOException;
 class Take extends TransactionEventRecord {
   private int offset;
   private int fileID;
-  Take(Long transactionID) {
-    super(transactionID);
+  Take(Long transactionID, Long logWriteOrderID) {
+    super(transactionID, logWriteOrderID);
   }
-  Take(Long transactionID, int offset, int fileID) {
-    this(transactionID);
+  Take(Long transactionID, Long logWriteOrderID, int offset, int fileID) {
+    this(transactionID, logWriteOrderID);
     this.offset = offset;
     this.fileID = fileID;
   }
@@ -58,6 +62,19 @@ class Take extends TransactionEventRecord {
     out.writeInt(fileID);
   }
   @Override
+  void writeProtos(OutputStream out) throws IOException {
+    ProtosFactory.Take.Builder takeBuilder = ProtosFactory.Take.newBuilder();
+    takeBuilder.setFileID(fileID);
+    takeBuilder.setOffset(offset);
+    takeBuilder.build().writeDelimitedTo(out);
+  }
+  @Override
+  void readProtos(InputStream in) throws IOException {
+    ProtosFactory.Take take = ProtosFactory.Take.parseDelimitedFrom(in);
+    fileID = take.getFileID();
+    offset = take.getOffset();
+  }
+  @Override
   short getRecordType() {
     return Type.TAKE.get();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 67c68f8..0488e90 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -23,10 +23,15 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 
+import org.apache.flume.chanel.file.proto.ProtosFactory;
 import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -36,26 +41,30 @@ import com.google.common.collect.ImmutableMap;
  * Base class for records in data file: Put, Take, Rollback, Commit
  */
 abstract class TransactionEventRecord implements Writable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TransactionEventRecord.class);
   private final long transactionID;
   private long logWriteOrderID;
 
-  protected TransactionEventRecord(long transactionID) {
+  protected TransactionEventRecord(long transactionID, long logWriteOrderID) {
     this.transactionID = transactionID;
+    this.logWriteOrderID = logWriteOrderID;
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    logWriteOrderID = in.readLong();
+
   }
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeLong(logWriteOrderID);
-  }
 
-  public void setLogWriteOrderID(long logWriteOrderID) {
-    this.logWriteOrderID = logWriteOrderID;
   }
-  public long getLogWriteOrderID() {
+
+  abstract void writeProtos(OutputStream out) throws IOException;
+
+  abstract void readProtos(InputStream in) throws IOException;
+
+  long getLogWriteOrderID() {
     return logWriteOrderID;
   }
   long getTransactionID() {
@@ -91,46 +100,49 @@ abstract class TransactionEventRecord implements Writable {
         ImmutableMap.<Short, Constructor<? extends TransactionEventRecord>>builder();
     try {
       builder.put(Type.PUT.get(),
-          Put.class.getDeclaredConstructor(Long.class));
+          Put.class.getDeclaredConstructor(Long.class, Long.class));
       builder.put(Type.TAKE.get(),
-          Take.class.getDeclaredConstructor(Long.class));
+          Take.class.getDeclaredConstructor(Long.class, Long.class));
       builder.put(Type.ROLLBACK.get(),
-          Rollback.class.getDeclaredConstructor(Long.class));
+          Rollback.class.getDeclaredConstructor(Long.class, Long.class));
       builder.put(Type.COMMIT.get(),
-          Commit.class.getDeclaredConstructor(Long.class));
+          Commit.class.getDeclaredConstructor(Long.class, Long.class));
     } catch (Exception e) {
       Throwables.propagate(e);
     }
     TYPES = builder.build();
   }
 
-
-  static ByteBuffer toByteBuffer(TransactionEventRecord record) {
+  @Deprecated
+  static ByteBuffer toByteBufferV2(TransactionEventRecord record) {
     ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
     DataOutputStream dataOutput = new DataOutputStream(byteOutput);
     try {
       dataOutput.writeInt(MAGIC_HEADER);
       dataOutput.writeShort(record.getRecordType());
       dataOutput.writeLong(record.getTransactionID());
+      dataOutput.writeLong(record.getLogWriteOrderID());
       record.write(dataOutput);
       dataOutput.flush();
       // TODO toByteArray does an unneeded copy
       return ByteBuffer.wrap(byteOutput.toByteArray());
     } catch(IOException e) {
       // near impossible
-      Throwables.propagate(e);
+      throw Throwables.propagate(e);
     } finally {
       if(dataOutput != null) {
         try {
           dataOutput.close();
-        } catch (IOException e) {}
+        } catch (IOException e) {
+          LOG.warn("Error closing byte array output stream", e);
+        }
       }
     }
-    throw new IllegalStateException(
-        "Should not occur as method should return or throw an exception");
   }
 
-  static TransactionEventRecord fromDataInput(DataInput in) throws IOException {
+  @Deprecated
+  static TransactionEventRecord fromDataInputV2(DataInput in)
+      throws IOException {
     int header = in.readInt();
     if(header != MAGIC_HEADER) {
       throw new IOException("Header " + Integer.toHexString(header) +
@@ -138,11 +150,56 @@ abstract class TransactionEventRecord implements Writable {
     }
     short type = in.readShort();
     long transactionID = in.readLong();
-    TransactionEventRecord entry = newRecordForType(type, transactionID);
+    long writeOrderID = in.readLong();
+    TransactionEventRecord entry = newRecordForType(type, transactionID,
+        writeOrderID);
     entry.readFields(in);
     return entry;
   }
 
+  static ByteBuffer toByteBuffer(TransactionEventRecord record) {
+    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(512);
+    try {
+      ProtosFactory.TransactionEventHeader.Builder headerBuilder =
+          ProtosFactory.TransactionEventHeader.newBuilder();
+      headerBuilder.setType(record.getRecordType());
+      headerBuilder.setTransactionID(record.getTransactionID());
+      headerBuilder.setWriteOrderID(record.getLogWriteOrderID());
+      headerBuilder.build().writeDelimitedTo(byteOutput);
+      record.writeProtos(byteOutput);
+      ProtosFactory.TransactionEventFooter footer =
+          ProtosFactory.TransactionEventFooter.newBuilder().build();
+      footer.writeDelimitedTo(byteOutput);
+      return ByteBuffer.wrap(byteOutput.toByteArray());
+    } catch(IOException e) {
+      throw Throwables.propagate(e);
+    } finally {
+      if(byteOutput != null) {
+        try {
+          byteOutput.close();
+        } catch (IOException e) {
+          LOG.warn("Error closing byte array output stream", e);
+        }
+      }
+    }
+  }
+
+  static TransactionEventRecord fromInputStream(InputStream in)
+      throws IOException {
+    ProtosFactory.TransactionEventHeader header =
+        ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in);
+    short type = (short)header.getType();
+    long transactionID = header.getTransactionID();
+    long writeOrderID = header.getWriteOrderID();
+    TransactionEventRecord transactionEvent =
+        newRecordForType(type, transactionID, writeOrderID);
+    transactionEvent.readProtos(in);
+    @SuppressWarnings("unused")
+    ProtosFactory.TransactionEventFooter footer =
+        ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in);
+    return transactionEvent;
+  }
+
   static String getName(short type) {
     Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
     Preconditions.checkNotNull(constructor, "Unknown action " +
@@ -150,17 +207,15 @@ abstract class TransactionEventRecord implements Writable {
     return constructor.getDeclaringClass().getSimpleName();
   }
 
-  private static TransactionEventRecord newRecordForType(short type, long transactionID) {
+  private static TransactionEventRecord newRecordForType(short type,
+      long transactionID, long writeOrderID) {
     Constructor<? extends TransactionEventRecord> constructor = TYPES.get(type);
     Preconditions.checkNotNull(constructor, "Unknown action " +
         Integer.toHexString(type));
     try {
-      return constructor.newInstance(transactionID);
+      return constructor.newInstance(transactionID, writeOrderID);
     } catch (Exception e) {
-      Throwables.propagate(e);
+      throw Throwables.propagate(e);
     }
-    throw new IllegalStateException(
-        "Should not occur as method should return or throw an exception");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
new file mode 100644
index 0000000..91567a6
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.flume.chanel.file.proto";
+option java_outer_classname = "ProtosFactory";
+option java_generate_equals_and_hash = true;
+
+message Checkpoint {
+  required sfixed32 version = 1;
+  required sfixed64 writeOrderID = 2;
+  required sfixed32 queueSize = 3;
+  required sfixed32 queueHead = 4;
+  repeated ActiveLog activeLogs = 5;
+}
+
+message ActiveLog {
+  required sfixed32 logFileID = 1;
+  required sfixed32 count = 2;
+}
+
+message LogFileMetaData {
+  required sfixed32 version = 1;
+  required sfixed32 logFileID = 2;
+  required sfixed64 checkpointPosition = 3;
+  required sfixed64 checkpointWriteOrderID = 4;
+}
+
+message TransactionEventHeader {
+  required sfixed32 type = 1;
+  required sfixed64 transactionID = 2;
+  required sfixed64 writeOrderID = 3;
+}
+
+message Put {
+  required FlumeEvent event = 1;
+}
+
+message Take {
+  required sfixed32 fileID = 1;
+  required sfixed32 offset = 2;
+}
+
+message Rollback {
+
+}
+
+message Commit {
+  required sfixed32 type = 1;
+}
+
+message TransactionEventFooter {
+
+}
+
+message FlumeEvent {
+  repeated FlumeEventHeader headers = 1;
+  required bytes body = 2;
+}
+
+message FlumeEventHeader {
+  required string key = 1;
+  required string value = 2;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index 2893538..1e0230d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -46,17 +46,19 @@ public class TestCheckpoint {
   }
   @Test
   public void testSerialization() throws Exception {
+    EventQueueBackingStore backingStore =
+        new EventQueueBackingStoreFileV2(file, 1, "test");
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
-    FlumeEventQueue queueIn = new FlumeEventQueue(1, file, inflightTakes,
-            inflightPuts, "test");
+    FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
+        inflightTakes, inflightPuts);
     queueIn.addHead(ptrIn);
-    FlumeEventQueue queueOut = new FlumeEventQueue(1, file, inflightTakes,
-            inflightPuts, "test");
+    FlumeEventQueue queueOut = new FlumeEventQueue(backingStore,
+        inflightTakes, inflightPuts);
     Assert.assertEquals(0, queueOut.getLogWriteOrderID());
     queueIn.checkpoint(false);
-    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, inflightTakes,
-            inflightPuts, "test");
-    FlumeEventPointer ptrOut = queueOut2.removeHead(0);
+    FlumeEventQueue queueOut2 = new FlumeEventQueue(backingStore,
+        inflightTakes, inflightPuts);
+    FlumeEventPointer ptrOut = queueOut2.removeHead(0L);
     Assert.assertEquals(ptrIn, ptrOut);
     Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
new file mode 100644
index 0000000..1fc9b49
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestEventQueueBackingStoreFactory {
+  static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[] {
+      8589936804L,
+      4294969563L,
+      12884904153L,
+      8589936919L,
+      4294969678L,
+      12884904268L,
+      8589937034L,
+      4294969793L,
+      12884904383L
+  });
+
+  File baseDir;
+  File checkpoint;
+  File inflightTakes;
+  File inflightPuts;
+  @Before
+  public void setup() throws IOException {
+    baseDir = Files.createTempDir();
+    checkpoint = new File(baseDir, "checkpoint");
+    inflightTakes = new File(baseDir, "takes");
+    inflightPuts = new File(baseDir, "puts");
+    TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint);
+
+  }
+  @After
+  public void teardown() {
+    FileUtils.deleteQuietly(baseDir);
+  }
+  @Test
+  public void testWithNoFlag() throws Exception {
+    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"),
+        Serialization.VERSION_3, pointersInTestCheckpoint);
+  }
+  @Test
+  public void testWithFlag() throws Exception {
+    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true),
+        Serialization.VERSION_3, pointersInTestCheckpoint);
+  }
+  @Test
+  public void testNoUprade() throws Exception {
+    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
+        Serialization.VERSION_2, pointersInTestCheckpoint);
+  }
+  @Test
+  public void testNewCheckpoint() throws Exception {
+    Assert.assertTrue(checkpoint.delete());
+    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
+        Serialization.VERSION_3, Collections.<Long>emptyList());
+  }
+
+  private void verify(EventQueueBackingStore backingStore, long expectedVersion,
+      List<Long> expectedPointers)
+      throws Exception {
+    FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes,
+        inflightPuts);
+    List<Long> actualPointers = Lists.newArrayList();
+    FlumeEventPointer ptr;
+    while((ptr = queue.removeHead(0L)) != null) {
+      actualPointers.add(ptr.toLong());
+    }
+    Assert.assertEquals(expectedPointers, actualPointers);
+    Assert.assertEquals(10, backingStore.getCapacity());
+    DataInputStream in = new DataInputStream(new FileInputStream(checkpoint));
+    long actualVersion = in.readLong();
+    Assert.assertEquals(expectedVersion, actualVersion);
+    in.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 35521d1..43c7ae3 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -18,14 +18,16 @@
  */
 package org.apache.flume.channel.file;
 
+import static org.apache.flume.channel.file.TestUtils.*;
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,7 +38,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPInputStream;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.ChannelException;
@@ -57,13 +58,7 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
-import com.google.common.io.Resources;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import static org.apache.flume.channel.file.TestUtils.*;
 
 public class TestFileChannel {
 
@@ -221,30 +216,42 @@ public class TestFileChannel {
 
   @Test
   public void testRestartLogReplayV1() throws Exception {
-    doTestRestart(true, false, false);
+    doTestRestart(true, false, false, false);
   }
   @Test
   public void testRestartLogReplayV2() throws Exception {
-    doTestRestart(false, false, false);
+    doTestRestart(false, false, false, false);
   }
 
   @Test
   public void testFastReplayV1() throws Exception {
-    doTestRestart(true, true, true);
+    doTestRestart(true, true, true, true);
   }
 
   @Test
   public void testFastReplayV2() throws Exception {
-    doTestRestart(false, true, true);
+    doTestRestart(false, true, true, true);
+  }
+
+  @Test
+  public void testNormalReplayV1() throws Exception {
+    doTestRestart(true, true, true, false);
   }
+
+  @Test
+  public void testNormalReplayV2() throws Exception {
+    doTestRestart(false, true, true, false);
+  }
+
   public void doTestRestart(boolean useLogReplayV1,
-          boolean forceCheckpoint, boolean deleteCheckpoint) throws Exception {
+          boolean forceCheckpoint, boolean deleteCheckpoint,
+          boolean useFastReplay) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
             String.valueOf(useLogReplayV1));
     overrides.put(
             FileChannelConfiguration.USE_FAST_REPLAY,
-            String.valueOf(deleteCheckpoint));
+            String.valueOf(useFastReplay));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -263,7 +270,9 @@ public class TestFileChannel {
     channel.stop();
     if(deleteCheckpoint) {
       File checkpoint = new File(checkpointDir, "checkpoint");
-      checkpoint.delete();
+      Assert.assertTrue(checkpoint.delete());
+      File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+      Assert.assertTrue(checkpointMetaData.delete());
     }
     channel = createFileChannel(overrides);
     channel.start();
@@ -272,6 +281,42 @@ public class TestFileChannel {
     compareInputAndOut(in, out);
   }
   @Test
+  public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot()
+      throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    Assert.assertTrue(checkpoint.delete());
+    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(checkpointMetaData.exists());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertFalse(channel.isOpen());
+  }
+  @Test
+  public void testRestartFailsWhenCheckpointExistsButMetaDoesNot()
+      throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Assert.assertEquals(1,  putEvents(channel, "restart", 1, 1).size());
+    forceCheckpoint(channel);
+    channel.stop();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
+    Assert.assertTrue(checkpointMetaData.delete());
+    Assert.assertTrue(checkpoint.exists());
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertFalse(channel.isOpen());
+  }
+  @Test
   public void testReconfigure() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -586,11 +631,11 @@ public class TestFileChannel {
   @Test
   public void testFileFormatV2postFLUME1432()
           throws Exception {
-    copyDecompressed("fileformat-v2-checkpoint.gz",
+    TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz",
             new File(checkpointDir, "checkpoint"));
     for (int i = 0; i < dataDirs.length; i++) {
       int fileIndex = i + 1;
-      copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
+      TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
               new File(dataDirs[i], "log-" + fileIndex));
     }
     Map<String, String> overrides = Maps.newHashMap();
@@ -623,12 +668,12 @@ public class TestFileChannel {
   }
   public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1)
           throws Exception {
-    copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz",
+    TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz",
             new File(checkpointDir, "checkpoint"));
     for (int i = 0; i < dataDirs.length; i++) {
       int fileIndex = i + 1;
-      copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex + ".gz",
-              new File(dataDirs[i], "log-" + fileIndex));
+      TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex
+          + ".gz", new File(dataDirs[i], "log-" + fileIndex));
     }
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
@@ -757,14 +802,4 @@ public class TestFileChannel {
     }).get();
     Assert.assertEquals(15, takenEvents.size());
   }
-
-
-  private static void copyDecompressed(String resource, File output)
-          throws IOException {
-    URL input =  Resources.getResource(resource);
-    long copied = ByteStreams.copy(new GZIPInputStream(input.openStream()),
-            new FileOutputStream(output));
-    LOG.info("Copied " + copied + " bytes from " + input + " to " + output);
-  }
-
 }


Mime
View raw message