flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [2/50] [abbrv] git commit: FLUME-1487. FileChannel format needs to be extensible.
Date Fri, 07 Sep 2012 23:28:52 GMT
FLUME-1487. FileChannel format needs to be extensible.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2b26f364
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2b26f364
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2b26f364

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: 2b26f36414924bdf96d50458cc256daf681e6753
Parents: 6b54976
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 7 14:51:16 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 7 16:13:11 2012 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-file-channel/pom.xml       |   71 +++
 .../flume/channel/file/CheckpointRebuilder.java    |   34 +-
 .../java/org/apache/flume/channel/file/Commit.java |   24 +-
 .../flume/channel/file/EventQueueBackingStore.java |   76 +++
 .../file/EventQueueBackingStoreFactory.java        |  110 ++++
 .../channel/file/EventQueueBackingStoreFile.java   |  278 ++++++++++
 .../channel/file/EventQueueBackingStoreFileV2.java |  101 ++++
 .../channel/file/EventQueueBackingStoreFileV3.java |  194 +++++++
 .../flume/channel/file/FlumeEventPointer.java      |   11 +-
 .../apache/flume/channel/file/FlumeEventQueue.java |  389 +++------------
 .../java/org/apache/flume/channel/file/Log.java    |   96 ++--
 .../org/apache/flume/channel/file/LogFile.java     |  309 ++++++------
 .../apache/flume/channel/file/LogFileFactory.java  |  120 +++++
 .../org/apache/flume/channel/file/LogFileV2.java   |  156 ++++++
 .../org/apache/flume/channel/file/LogFileV3.java   |  234 +++++++++
 .../org/apache/flume/channel/file/LogUtils.java    |    3 +-
 .../java/org/apache/flume/channel/file/Put.java    |   48 ++-
 .../apache/flume/channel/file/ReplayHandler.java   |   14 +-
 .../org/apache/flume/channel/file/Rollback.java    |   20 +-
 .../apache/flume/channel/file/Serialization.java   |   41 ++
 .../java/org/apache/flume/channel/file/Take.java   |   25 +-
 .../flume/channel/file/TransactionEventRecord.java |  105 +++-
 .../src/main/proto/filechannel.proto               |   78 +++
 .../apache/flume/channel/file/TestCheckpoint.java  |   16 +-
 .../file/TestEventQueueBackingStoreFactory.java    |  108 ++++
 .../apache/flume/channel/file/TestFileChannel.java |   97 +++--
 .../flume/channel/file/TestFlumeEventQueue.java    |  183 ++++++--
 .../org/apache/flume/channel/file/TestLog.java     |    4 +-
 .../org/apache/flume/channel/file/TestLogFile.java |   34 +-
 .../apache/flume/channel/file/TestLogRecord.java   |    7 +-
 .../channel/file/TestTransactionEventRecord.java   |  145 ------
 .../channel/file/TestTransactionEventRecordV2.java |  153 ++++++
 .../channel/file/TestTransactionEventRecordV3.java |  140 ++++++
 .../org/apache/flume/channel/file/TestUtils.java   |   27 +-
 pom.xml                                            |    7 +
 35 files changed, 2632 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml
index 934e69a..f70b4d9 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -37,6 +37,71 @@
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.build.directory}/generated-sources/java</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <configuration>
+          <skipTests>false</skipTests>
+        </configuration>
+        <executions>
+          <execution>
+            <id>compile-proto</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <echo file="target/compile-proto.sh">
+                    PROTO_DIR=src/main/proto
+                    JAVA_DIR=target/generated-sources/java
+                    which cygpath 2&gt; /dev/null
+                    if [ $? = 1 ]; then
+                      IS_WIN=false
+                    else
+                      IS_WIN=true
+                      WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR`
+                      WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR`
+                    fi
+                    mkdir -p $JAVA_DIR 2&gt; /dev/null
+                    for PROTO_FILE in `ls $PROTO_DIR/*.proto 2&gt; /dev/null`
+                    do
+                        if [ "$IS_WIN" = "true" ]; then
+                          protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE
+                        else
+                          protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
+                        fi
+                    done
+                </echo>
+                <exec executable="sh" dir="${basedir}" failonerror="true">
+                  <arg line="target/compile-proto.sh"/>
+                </exec>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
   </build>
 
@@ -99,6 +164,12 @@
       <optional>true</optional>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 32b5324..1ba5d1c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -40,7 +40,6 @@ public class CheckpointRebuilder {
 
   private final File checkpointDir;
   private final List<File> logFiles;
-  private final long maxFileSize;
   private final FlumeEventQueue queue;
   private final Set<ComparableFlumeEventPointer> committedPuts =
           Sets.newHashSet();
@@ -55,12 +54,10 @@ public class CheckpointRebuilder {
           LoggerFactory.getLogger(CheckpointRebuilder.class);
 
   public CheckpointRebuilder(File checkpointDir, List<File> logFiles,
-          long maxFileSize,
           FlumeEventQueue queue) throws IOException {
     this.checkpointDir = checkpointDir;
     this.logFiles = logFiles;
     this.queue = queue;
-    this.maxFileSize = maxFileSize;
   }
 
   public boolean rebuild() throws IOException, Exception {
@@ -72,7 +69,7 @@ public class CheckpointRebuilder {
     LOG.info("Attempting to fast replay the log files.");
     List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
     for (File logFile : logFiles) {
-      logReaders.add(new LogFile.SequentialReader(logFile));
+      logReaders.add(LogFileFactory.getSequentialReader(logFile));
     }
     long transactionIDSeed = 0;
     long writeOrderIDSeed = 0;
@@ -158,26 +155,25 @@ public class CheckpointRebuilder {
 
   private void writeCheckpoint() throws IOException {
     long checkpointLogOrderID = 0;
-    List<LogFile.Writer> logWriters = Lists.newArrayList();
+    List<LogFile.MetaDataWriter> metaDataWriters = Lists.newArrayList();
     for (File logFile : logFiles) {
         String name = logFile.getName();
-        logWriters.add(new LogFile.Writer(logFile,
-                Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)),
-                maxFileSize));
+        metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile,
+            Integer.parseInt(name.substring(name.lastIndexOf('-') + 1))));
     }
     try {
       if (queue.checkpoint(true)) {
         checkpointLogOrderID = queue.getLogWriteOrderID();
-        for (LogFile.Writer logWriter : logWriters) {
-          logWriter.markCheckpoint(checkpointLogOrderID);
+        for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) {
+          metaDataWriter.markCheckpoint(checkpointLogOrderID);
         }
       }
     } catch (Exception e) {
       LOG.warn("Error while generating checkpoint "
               + "using fast generation logic", e);
     } finally {
-      for (LogFile.Writer logWriter : logWriters) {
-        logWriter.close();
+      for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) {
+        metaDataWriter.close();
       }
     }
   }
@@ -233,8 +229,6 @@ public class CheckpointRebuilder {
     opt = new Option("l", true, "comma-separated list of log directories");
     opt.setRequired(true);
     options.addOption(opt);
-    opt = new Option("s", true, "maximum size of log files");
-    opt.setRequired(true);
     options.addOption(opt);
     opt = new Option("t", true, "capacity of the channel");
     opt.setRequired(true);
@@ -249,14 +243,14 @@ public class CheckpointRebuilder {
       logFiles.addAll(Arrays.asList(files));
     }
     int capacity = Integer.parseInt(cli.getOptionValue("t"));
-    long maxFileSize = Long.parseLong(cli.getOptionValue("s"));
-    boolean isReplayV1 = cli.hasOption("v");
-    FlumeEventQueue queue = new FlumeEventQueue(capacity,
-            new File(checkpointDir, "checkpoint"),
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(new File(checkpointDir, "checkpoint"),
+            capacity, "channel");
+    FlumeEventQueue queue = new FlumeEventQueue(backingStore,
             new File(checkpointDir, "inflighttakes"),
-            new File(checkpointDir, "inflightputs"), "channel");
+            new File(checkpointDir, "inflightputs"));
     CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir,
-            logFiles, maxFileSize, queue);
+            logFiles, queue);
     if(rebuilder.rebuild()) {
       rebuilder.writeCheckpoint();
     } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
index 2c92d28..7d57bb8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java
@@ -21,6 +21,10 @@ package org.apache.flume.channel.file;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
 
 /**
  * Represents a Commit on disk
@@ -30,11 +34,11 @@ class Commit extends TransactionEventRecord {
    * Type of Commit Take|Put
    */
   private short type;
-  Commit(Long transactionID) {
-    super(transactionID);
+  Commit(Long transactionID, Long logWriteOrderID) {
+    super(transactionID, logWriteOrderID);
   }
-  Commit(Long transactionID, short type) {
-    this(transactionID);
+  Commit(Long transactionID, Long logWriteOrderID, short type) {
+    this(transactionID, logWriteOrderID);
     this.type = type;
   }
   @Override
@@ -42,6 +46,18 @@ class Commit extends TransactionEventRecord {
     super.readFields(in);
     type = in.readShort();
   }
+  @Override
+  void writeProtos(OutputStream out) throws IOException {
+    ProtosFactory.Commit.Builder commitBuilder =
+        ProtosFactory.Commit.newBuilder();
+    commitBuilder.setType(type);
+    commitBuilder.build().writeDelimitedTo(out);
+  }
+  @Override
+  void readProtos(InputStream in) throws IOException {
+    ProtosFactory.Commit commit = ProtosFactory.Commit.parseDelimitedFrom(in);
+    type = (short) commit.getType();
+  }
 
   short getType() {
     return type;

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
new file mode 100644
index 0000000..13b50da
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableSortedSet;
+
+abstract class EventQueueBackingStore {
+  protected static final int EMPTY = 0;
+  private int queueSize;
+  private int queueHead;
+  private long logWriteOrderID;
+  private final int capacity;
+  private final String name;
+
+  protected EventQueueBackingStore(int capacity, String name) {
+    this.capacity = capacity;
+    this.name = name;
+  }
+
+
+  abstract void checkpoint() throws IOException;
+  abstract void incrementFileID(int fileID);
+  abstract void decrementFileID(int fileID);
+  abstract ImmutableSortedSet<Integer> getReferenceCounts();
+  abstract long get(int index);
+  abstract void put(int index, long value);
+  abstract boolean syncRequired();
+  abstract void close() throws IOException;
+
+  protected abstract int getVersion();
+
+  int getSize() {
+    return queueSize;
+  }
+  void setSize(int size) {
+    queueSize = size;
+  }
+  int getHead() {
+    return queueHead;
+  }
+  void setHead(int head) {
+    queueHead = head;
+  }
+  int getCapacity() {
+    return capacity;
+  }
+
+  String getName() {
+    return name;
+  }
+  protected void setLogWriteOrderID(long logWriteOrderID) {
+    this.logWriteOrderID = logWriteOrderID;
+  }
+  long getLogWriteOrderID() {
+    return logWriteOrderID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
new file mode 100644
index 0000000..6c07152
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+class EventQueueBackingStoreFactory {
+  private static final Logger LOG = LoggerFactory
+  .getLogger(EventQueueBackingStoreFactory.class);
+  private EventQueueBackingStoreFactory() {}
+  static EventQueueBackingStore get(File checkpointFile, int capacity,
+      String name) throws Exception {
+    return get(checkpointFile, capacity, name, true);
+  }
+  static EventQueueBackingStore get(File checkpointFile, int capacity,
+      String name, boolean upgrade) throws Exception {
+    File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
+    RandomAccessFile checkpointFileHandle = null;
+    try {
+      boolean checkpointExists = checkpointFile.exists();
+      boolean metaDataExists = metaDataFile.exists();
+      if(metaDataExists) {
+        // if we have a metadata file but no checkpoint file, we have a problem
+        if(!checkpointExists || checkpointFile.length() == 0) {
+          LOG.error("MetaData file for checkpoint " +
+              " exists but checkpoint does not. Checkpoint = " + checkpointFile +
+              ", metaDataFile = " + metaDataFile);
+          throw new IllegalStateException(
+              "The last checkpoint was not completed correctly. Please delete "
+                  + "the checkpoint files: " + checkpointFile + " and "
+                  + Serialization.getMetaDataFile(checkpointFile)
+                  + " to rebuild the checkpoint and start again. " + name);
+        }
+      }
+      // brand new, use v3
+      if(!checkpointExists) {
+        if(!checkpointFile.createNewFile()) {
+          throw new IOException("Cannot create " + checkpointFile);
+        }
+        return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+      }
+      // v3 due to meta file, version will be checked by backing store
+      if(metaDataExists) {
+        return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+      }
+      checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
+      int version = (int)checkpointFileHandle.readLong();
+      if(Serialization.VERSION_2 == version) {
+        if(upgrade) {
+          return upgrade(checkpointFile, capacity, name);
+        }
+        return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+      }
+      LOG.error("Found version " + Integer.toHexString(version) + " in " +
+          checkpointFile);
+      throw new IllegalStateException(
+          "The last checkpoint was not completed correctly. Please delete "
+              + "the checkpoint files: " + checkpointFile + " and "
+              + Serialization.getMetaDataFile(checkpointFile)
+              + " to rebuild the checkpoint and start again. " + name);
+    } finally {
+      if(checkpointFileHandle != null) {
+        try {
+          checkpointFileHandle.close();
+        } catch(IOException e) {
+          LOG.warn("Unable to close " + checkpointFile, e);
+        }
+      }
+    }
+  }
+
+  private static EventQueueBackingStore upgrade(File checkpointFile,
+      int capacity, String name)
+          throws Exception {
+    LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
+    EventQueueBackingStoreFileV2 backingStoreV2 =
+        new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+    String backupName = checkpointFile.getName() + "-backup-"
+        + System.currentTimeMillis();
+    Files.copy(checkpointFile,
+        new File(checkpointFile.getParentFile(), backupName));
+    File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
+    EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
+        metaDataFile);
+    return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
new file mode 100644
index 0000000..4717055
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.LongBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+
+
+abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(EventQueueBackingStoreFile.class);
+  private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
+  protected static final int HEADER_SIZE = 1029;
+  protected static final int INDEX_VERSION = 0;
+  protected static final int INDEX_WRITE_ORDER_ID = 1;
+  protected static final int INDEX_CHECKPOINT_MARKER = 4;
+  protected static final int CHECKPOINT_COMPLETE = 0;
+  protected static final int CHECKPOINT_INCOMPLETE = 1;
+
+  protected LongBuffer elementsBuffer;
+  protected final Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
+  protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap();
+  protected final MappedByteBuffer mappedBuffer;
+  protected final RandomAccessFile checkpointFileHandle;
+  protected final File checkpointFile;
+
+  protected EventQueueBackingStoreFile(int capacity, String name,
+      File checkpointFile) throws IOException {
+    super(capacity, name);
+    this.checkpointFile = checkpointFile;
+    checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
+
+    if(checkpointFileHandle.length() == 0) {
+      int totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
+      allocate(checkpointFile, totalBytes);
+      checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG);
+      checkpointFileHandle.writeLong(getVersion());
+      checkpointFileHandle.getChannel().force(true);
+      LOG.info("Preallocated " + checkpointFile + " to " + checkpointFileHandle.length()
+          + " for capacity " + capacity);
+    }
+    mappedBuffer = checkpointFileHandle.getChannel().map(MapMode.READ_WRITE, 0,
+        checkpointFile.length());
+    elementsBuffer = mappedBuffer.asLongBuffer();
+
+    int version = (int) elementsBuffer.get(INDEX_VERSION);
+    Preconditions.checkState(version == getVersion(),
+        "Invalid version: " + version + " " + name + ", expected " + getVersion());
+
+    long checkpointComplete =
+        (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
+    Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE,
+        "The last checkpoint was not completed correctly. Please delete "
+            + "the checkpoint files: " + checkpointFile + " and "
+            + Serialization.getMetaDataFile(checkpointFile)
+            + " to rebuild the checkpoint and start again. " + name);
+  }
+
+  protected long getCheckpointLogWriteOrderID() {
+    return elementsBuffer.get(INDEX_WRITE_ORDER_ID);
+  }
+
+  protected abstract void writeCheckpointMetaData() throws IOException;
+
+  @Override
+  void checkpoint()  throws IOException {
+
+    LOG.info("Start checkpoint for " + checkpointFile +
+        ", elements to sync = " + overwriteMap.size());
+
+    // Start checkpoint
+    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
+
+    setLogWriteOrderID(WriteOrderOracle.next());
+    LOG.info("Updating checkpoint metadata: logWriteOrderID: "
+        + getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: "
+          + getHead());
+    elementsBuffer.put(INDEX_WRITE_ORDER_ID, getLogWriteOrderID());
+    try {
+      writeCheckpointMetaData();
+    } catch (IOException e) {
+      throw new IOException("Error writing metadata", e);
+    }
+
+    Iterator<Integer> it = overwriteMap.keySet().iterator();
+    while (it.hasNext()) {
+      int index = it.next();
+      long value = overwriteMap.get(index);
+      elementsBuffer.put(index, value);
+      it.remove();
+    }
+
+    Preconditions.checkState(overwriteMap.isEmpty(),
+        "concurrent update detected ");
+
+    // Finish checkpoint
+    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
+    mappedBuffer.force();
+  }
+
+
+  @Override
+  void close() {
+    mappedBuffer.force();
+    try {
+      checkpointFileHandle.close();
+    } catch (IOException e) {
+      LOG.info("Error closing " + checkpointFile, e);
+    }
+  }
+
+  @Override
+  long get(int index) {
+    int realIndex = getPhysicalIndex(index);
+    long result = EMPTY;
+    if (overwriteMap.containsKey(realIndex)) {
+      result = overwriteMap.get(realIndex);
+    } else {
+      result = elementsBuffer.get(realIndex);
+    }
+    return result;
+  }
+
+  @Override
+  ImmutableSortedSet<Integer> getReferenceCounts() {
+    return ImmutableSortedSet.copyOf(logFileIDReferenceCounts.keySet());
+  }
+
+  @Override
+  void put(int index, long value) {
+    int realIndex = getPhysicalIndex(index);
+    overwriteMap.put(realIndex, value);
+  }
+
+  @Override
+  boolean syncRequired() {
+    return overwriteMap.size() > 0;
+  }
+
+  @Override
+  protected void incrementFileID(int fileID) {
+    AtomicInteger counter = logFileIDReferenceCounts.get(fileID);
+    if(counter == null) {
+      counter = new AtomicInteger(0);
+      logFileIDReferenceCounts.put(fileID, counter);
+    }
+    counter.incrementAndGet();
+  }
+  @Override
+  protected void decrementFileID(int fileID) {
+    AtomicInteger counter = logFileIDReferenceCounts.get(fileID);
+    Preconditions.checkState(counter != null, "null counter ");
+    int count = counter.decrementAndGet();
+    if(count == 0) {
+      logFileIDReferenceCounts.remove(fileID);
+    }
+  }
+
+  protected int getPhysicalIndex(int index) {
+    return HEADER_SIZE + (getHead() + index) % getCapacity();
+  }
+
+  protected static void allocate(File file, long totalBytes) throws IOException {
+    RandomAccessFile checkpointFile = new RandomAccessFile(file, "rw");
+    boolean success = false;
+    try {
+      if (totalBytes <= MAX_ALLOC_BUFFER_SIZE) {
+        checkpointFile.write(new byte[(int)totalBytes]);
+      } else {
+        byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE];
+        long remainingBytes = totalBytes;
+        while (remainingBytes >= MAX_ALLOC_BUFFER_SIZE) {
+          checkpointFile.write(initBuffer);
+          remainingBytes -= MAX_ALLOC_BUFFER_SIZE;
+        }
+        if (remainingBytes > 0) {
+          checkpointFile.write(initBuffer, 0, (int)remainingBytes);
+        }
+      }
+      success = true;
+    } finally {
+      try {
+        checkpointFile.close();
+      } catch (IOException e) {
+        if(success) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    File file = new File(args[0]);
+    File inflightTakesFile = new File(args[1]);
+    File inflightPutsFile = new File(args[2]);
+    if (!file.exists()) {
+      throw new IOException("File " + file + " does not exist");
+    }
+    if (file.length() == 0) {
+      throw new IOException("File " + file + " is empty");
+    }
+    int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
+    EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile)
+        EventQueueBackingStoreFactory.get(file,capacity, "debug", false);
+    System.out.println("File Reference Counts"
+            + backingStore.logFileIDReferenceCounts);
+    System.out.println("Queue Capacity " + backingStore.getCapacity());
+    System.out.println("Queue Size " + backingStore.getSize());
+    System.out.println("Queue Head " + backingStore.getHead());
+    for (int index = 0; index < backingStore.getCapacity(); index++) {
+      long value = backingStore.get(backingStore.getPhysicalIndex(index));
+      int fileID = (int) (value >>> 32);
+      int offset = (int) value;
+      System.out.println(index + ":" + Long.toHexString(value) + " fileID = "
+              + fileID + ", offset = " + offset);
+    }
+    FlumeEventQueue queue =
+        new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile);
+    SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts();
+    System.out.println("Inflight Puts:");
+
+    for (Long txnID : putMap.keySet()) {
+      Set<Long> puts = putMap.get(txnID);
+      System.out.println("Transaction ID: " + String.valueOf(txnID));
+      for (long value : puts) {
+        int fileID = (int) (value >>> 32);
+        int offset = (int) value;
+        System.out.println(Long.toHexString(value) + " fileID = "
+                + fileID + ", offset = " + offset);
+      }
+    }
+    SetMultimap<Long, Long> takeMap = queue.deserializeInflightTakes();
+    System.out.println("Inflight takes:");
+    for (Long txnID : takeMap.keySet()) {
+      Set<Long> takes = takeMap.get(txnID);
+      System.out.println("Transaction ID: " + String.valueOf(txnID));
+      for (long value : takes) {
+        int fileID = (int) (value >>> 32);
+        int offset = (int) value;
+        System.out.println(Long.toHexString(value) + " fileID = "
+                + fileID + ", offset = " + offset);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
new file mode 100644
index 0000000..8bbc081
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
+
+
+  private static final int INDEX_SIZE = 2;
+  private static final int INDEX_HEAD = 3;
+  private static final int INDEX_ACTIVE_LOG = 5;
+  private static final int MAX_ACTIVE_LOGS = 1024;
+
+  EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name)
+      throws IOException {
+    super(capacity, name, checkpointFile);
+    Preconditions.checkArgument(capacity > 0,
+        "capacity must be greater than 0 " + capacity);
+
+    setLogWriteOrderID(elementsBuffer.get(INDEX_WRITE_ORDER_ID));
+    setSize((int) elementsBuffer.get(INDEX_SIZE));
+    setHead((int) elementsBuffer.get(INDEX_HEAD));
+
+    int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS;
+    for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) {
+      long nextFileCode = elementsBuffer.get(i);
+      if (nextFileCode  != EMPTY) {
+        Pair<Integer, Integer> idAndCount =
+            deocodeActiveLogCounter(nextFileCode);
+        logFileIDReferenceCounts.put(idAndCount.getLeft(),
+            new AtomicInteger(idAndCount.getRight()));
+      }
+    }
+  }
+  @Override
+  protected int getVersion() {
+    return Serialization.VERSION_2;
+  }
+
+  @Override
+  protected void incrementFileID(int fileID) {
+    super.incrementFileID(fileID);
+    Preconditions.checkState(logFileIDReferenceCounts.size() < MAX_ACTIVE_LOGS,
+        "Too many active logs ");
+  }
+
+
+  private Pair<Integer, Integer> deocodeActiveLogCounter(long value) {
+    int fileId = (int) (value >>> 32);
+    int count = (int) value;
+    return Pair.of(fileId, count);
+  }
+  private long encodeActiveLogCounter(int fileId, int count) {
+    long result = fileId;
+    result = (long)fileId << 32;
+    result += (long) count;
+    return result;
+  }
+  @Override
+  protected void writeCheckpointMetaData() {
+    elementsBuffer.put(INDEX_SIZE, getSize());
+    elementsBuffer.put(INDEX_HEAD, getHead());
+    List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
+    for (Integer fileId : logFileIDReferenceCounts.keySet()) {
+      Integer count = logFileIDReferenceCounts.get(fileId).get();
+      long value = encodeActiveLogCounter(fileId, count);
+      fileIdAndCountEncoded.add(value);
+    }
+
+    int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
+    for (int i = 0; i < emptySlots; i++)  {
+      fileIdAndCountEncoded.add(0L);
+    }
+    for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
+      elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
new file mode 100644
index 0000000..c766d09
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.chanel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(EventQueueBackingStoreFileV3.class);
+  private final File metaDataFile;
+
+  EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name)
+      throws IOException {
+    super(capacity, name, checkpointFile);
+    Preconditions.checkArgument(capacity > 0,
+        "capacity must be greater than 0 " + capacity);
+    metaDataFile = Serialization.getMetaDataFile(checkpointFile);
+    LOG.info("Starting up with " + checkpointFile + " and " + metaDataFile);
+    if(metaDataFile.exists()) {
+      FileInputStream inputStream = new FileInputStream(metaDataFile);
+      try {
+        LOG.info("Reading checkpoint metadata from " + metaDataFile);
+        ProtosFactory.Checkpoint checkpoint =
+            ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream);
+        int version = checkpoint.getVersion();
+        Preconditions.checkState(version == getVersion(),
+            "Invalid version: " + version + " " + name + ", expected "
+                + getVersion());
+        long logWriteOrderID = checkpoint.getWriteOrderID();
+        if(logWriteOrderID != getCheckpointLogWriteOrderID()) {
+          LOG.error("Checkpoint and Meta files have differing " +
+              "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and "
+              + logWriteOrderID);
+          throw new IllegalStateException(
+              "The last checkpoint was not completed correctly. Please delete "
+                  + "the checkpoint files: " + checkpointFile + " and "
+                  + Serialization.getMetaDataFile(checkpointFile)
+                  + " to rebuild the checkpoint and start again. " + name);
+        }
+        WriteOrderOracle.setSeed(logWriteOrderID);
+        setLogWriteOrderID(logWriteOrderID);
+        setSize(checkpoint.getQueueSize());
+        setHead(checkpoint.getQueueHead());
+        for(ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) {
+          Integer logFileID = activeLog.getLogFileID();
+          Integer count = activeLog.getCount();
+          logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count));
+        }
+      } finally {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOG.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    } else {
+      ProtosFactory.Checkpoint.Builder checkpointBuilder =
+          ProtosFactory.Checkpoint.newBuilder();
+      checkpointBuilder.setVersion(getVersion());
+      checkpointBuilder.setQueueHead(getHead());
+      checkpointBuilder.setQueueSize(getSize());
+      checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
+      FileOutputStream outputStream = new FileOutputStream(metaDataFile);
+      try {
+        checkpointBuilder.build().writeDelimitedTo(outputStream);
+        outputStream.getChannel().force(true);
+      } finally {
+        try {
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.warn("Unable to close " + metaDataFile, e);
+        }
+      }
+    }
+  }
+  File getMetaDataFile() {
+    return metaDataFile;
+  }
+
+  @Override
+  protected int getVersion() {
+    return Serialization.VERSION_3;
+  }
+  @Override
+  protected void writeCheckpointMetaData() throws IOException {
+    ProtosFactory.Checkpoint.Builder checkpointBuilder =
+        ProtosFactory.Checkpoint.newBuilder();
+    checkpointBuilder.setVersion(getVersion());
+    checkpointBuilder.setQueueHead(getHead());
+    checkpointBuilder.setQueueSize(getSize());
+    checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
+    for(Integer logFileID : logFileIDReferenceCounts.keySet()) {
+      int count = logFileIDReferenceCounts.get(logFileID).get();
+      if(count != 0) {
+         ProtosFactory.ActiveLog.Builder activeLogBuilder =
+             ProtosFactory.ActiveLog.newBuilder();
+         activeLogBuilder.setLogFileID(logFileID);
+         activeLogBuilder.setCount(count);
+         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
+      }
+    }
+    FileOutputStream outputStream = new FileOutputStream(metaDataFile);
+    try {
+      checkpointBuilder.build().writeDelimitedTo(outputStream);
+      outputStream.getChannel().force(true);
+    } finally {
+      try {
+        outputStream.close();
+      } catch (IOException e) {
+        LOG.warn("Unable to close " + metaDataFile, e);
+      }
+    }
+  }
+
+  static void upgrade(EventQueueBackingStoreFileV2 backingStoreV2,
+      File checkpointFile, File metaDataFile)
+          throws IOException {
+
+    int head = backingStoreV2.getHead();
+    int size = backingStoreV2.getSize();
+    long writeOrderID = backingStoreV2.getLogWriteOrderID();
+    Map<Integer, AtomicInteger> referenceCounts =
+        backingStoreV2.logFileIDReferenceCounts;
+
+    ProtosFactory.Checkpoint.Builder checkpointBuilder =
+        ProtosFactory.Checkpoint.newBuilder();
+    checkpointBuilder.setVersion(Serialization.VERSION_3);
+    checkpointBuilder.setQueueHead(head);
+    checkpointBuilder.setQueueSize(size);
+    checkpointBuilder.setWriteOrderID(writeOrderID);
+    for(Integer logFileID : referenceCounts.keySet()) {
+      int count = referenceCounts.get(logFileID).get();
+      if(count > 0) {
+         ProtosFactory.ActiveLog.Builder activeLogBuilder =
+             ProtosFactory.ActiveLog.newBuilder();
+         activeLogBuilder.setLogFileID(logFileID);
+         activeLogBuilder.setCount(count);
+         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
+      }
+    }
+    FileOutputStream outputStream = new FileOutputStream(metaDataFile);
+    try {
+      checkpointBuilder.build().writeDelimitedTo(outputStream);
+      outputStream.getChannel().force(true);
+    } finally {
+      try {
+        outputStream.close();
+      } catch (IOException e) {
+        LOG.warn("Unable to close " + metaDataFile, e);
+      }
+    }
+    RandomAccessFile checkpointFileHandle =
+        new RandomAccessFile(checkpointFile, "rw");
+    try {
+      checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG);
+      checkpointFileHandle.writeLong(Serialization.VERSION_3);
+      checkpointFileHandle.getChannel().force(true);
+    } finally {
+      try {
+        checkpointFileHandle.close();
+      } catch (IOException e) {
+        LOG.warn("Unable to close " + checkpointFile, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
index e40cd8c..5f06ab7 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel.file;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Pointer to an Event on disk. This is represented in memory
@@ -31,7 +30,15 @@ class FlumeEventPointer {
   FlumeEventPointer(int fileID, int offset) {
     this.fileID = fileID;
     this.offset = offset;
-    Preconditions.checkArgument(offset > 0);
+    /*
+     * Log files used to have a header, now metadata is in
+     * a separate file so data starts at offset 0.
+     */
+    if(offset < 0) {
+      throw new IllegalArgumentException("offset = " + offset + "(" +
+          Integer.toHexString(offset) + ")" + ", fileID = " + fileID
+            + "(" + Integer.toHexString(fileID) + ")");
+    }
   }
   int getFileID() {
     return fileID;

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 8085d22..a8df042 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -21,25 +21,9 @@ package org.apache.flume.channel.file;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.LongBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel.MapMode;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
 import java.security.MessageDigest;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,7 +33,14 @@ import java.util.TreeSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 
 /**
  * Queue of events in the channel. This queue stores only
@@ -63,120 +54,22 @@ import org.apache.commons.lang.ArrayUtils;
 final class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
   .getLogger(FlumeEventQueue.class);
-  private static final long VERSION = 2;
   private static final int EMPTY = 0;
-  private static final int INDEX_VERSION = 0;
-  private static final int INDEX_WRITE_ORDER_ID = 1;
-  private static final int INDEX_SIZE = 2;
-  private static final int INDEX_HEAD = 3;
-  private static final int INDEX_CHECKPOINT_MARKER = 4;
-  private static final int CHECKPOINT_COMPLETE = EMPTY;
-  private static final int CHECKPOINT_INCOMPLETE = 1;
-  private static final int INDEX_ACTIVE_LOG = 5;
-  private static final int MAX_ACTIVE_LOGS = 1024;
-  private static final int HEADER_SIZE = 1029;
-  private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
-  private final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
-  private final MappedByteBuffer mappedBuffer;
-  private final LongBuffer elementsBuffer;
-  private LongBufferWrapper elements;
-  private final RandomAccessFile checkpointFile;
-  private final java.nio.channels.FileChannel checkpointFileHandle;
-  private final int queueCapacity;
+  private final EventQueueBackingStore backingStore;
   private final String channelNameDescriptor;
   private final InflightEventWrapper inflightTakes;
   private final InflightEventWrapper inflightPuts;
 
-  private int queueSize;
-  private int queueHead;
-  private long logWriteOrderID;
-
   /**
    * @param capacity max event capacity of queue
    * @throws IOException
    */
-  FlumeEventQueue(int capacity, File file, File inflightTakesFile,
-          File inflightPutsFile, String name) throws Exception {
-    Preconditions.checkArgument(capacity > 0,
+  FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile,
+          File inflightPutsFile) throws Exception {
+    Preconditions.checkArgument(backingStore.getCapacity() > 0,
         "Capacity must be greater than zero");
-    this.channelNameDescriptor = "[channel=" + name + "]";
-    this.queueCapacity = capacity;
-
-    if (!file.exists()) {
-      Preconditions.checkState(file.createNewFile(), "Unable to create file: "
-          + file.getCanonicalPath() + " " + channelNameDescriptor);
-    }
-
-    boolean freshlyAllocated = false;
-    checkpointFile = new RandomAccessFile(file, "rw");
-    if (checkpointFile.length() == 0) {
-      // Allocate
-      LOG.info("Event queue has zero allocation. Initializing to capacity. "
-          + "Please wait...");
-      int totalBytes = (capacity + HEADER_SIZE)*8;
-      if (totalBytes <= MAX_ALLOC_BUFFER_SIZE) {
-        checkpointFile.write(new byte[totalBytes]);
-      } else {
-        byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE];
-        int remainingBytes = totalBytes;
-        while (remainingBytes >= MAX_ALLOC_BUFFER_SIZE) {
-          checkpointFile.write(initBuffer);
-          remainingBytes -= MAX_ALLOC_BUFFER_SIZE;
-        }
-        if (remainingBytes > 0) {
-          checkpointFile.write(initBuffer, 0, remainingBytes);
-        }
-      }
-
-      LOG.info("Event queue allocation complete");
-      freshlyAllocated = true;
-    } else {
-      int fileCapacity = (int) checkpointFile.length() / 8;
-      int expectedCapacity = capacity + HEADER_SIZE;
-
-      Preconditions.checkState(fileCapacity == expectedCapacity,
-          "Capacity cannot be changed once the channel is initialized "
-              + channelNameDescriptor + ": fileCapacity = " + fileCapacity
-              + ", expectedCapacity = " + expectedCapacity);
-    }
-
-    checkpointFileHandle = checkpointFile.getChannel();
-
-    mappedBuffer = checkpointFileHandle.map(MapMode.READ_WRITE, 0,
-        file.length());
-
-    elementsBuffer = mappedBuffer.asLongBuffer();
-    if (freshlyAllocated) {
-      elementsBuffer.put(INDEX_VERSION, VERSION);
-    } else {
-      int version = (int) elementsBuffer.get(INDEX_VERSION);
-      Preconditions.checkState(version == VERSION,
-          "Invalid version: " + version + channelNameDescriptor);
-      logWriteOrderID = elementsBuffer.get(INDEX_WRITE_ORDER_ID);
-      queueSize = (int) elementsBuffer.get(INDEX_SIZE);
-      queueHead = (int) elementsBuffer.get(INDEX_HEAD);
-
-      long checkpointComplete =
-          (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
-      Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE,
-          "The last checkpoint was not completed correctly. Please delete "
-          + "the checkpoint file: " + file.getCanonicalPath() + " to rebuild "
-          + "the checkpoint and start again. " + channelNameDescriptor);
-
-      int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS;
-      for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) {
-        long nextFileCode = elementsBuffer.get(i);
-        if (nextFileCode  != EMPTY) {
-          Pair<Integer, Integer> idAndCount =
-              deocodeActiveLogCounter(nextFileCode);
-          fileIDCounts.put(idAndCount.getLeft(),
-              new AtomicInteger(idAndCount.getRight()));
-        }
-      }
-    }
-
-    elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor);
-    //TODO: Support old code paths with no inflight files.
+    this.channelNameDescriptor = "[channel=" + backingStore.getName() + "]";
+    this.backingStore = backingStore;
     try {
       inflightPuts = new InflightEventWrapper(inflightPutsFile);
       inflightTakes = new InflightEventWrapper(inflightTakesFile);
@@ -186,20 +79,6 @@ final class FlumeEventQueue {
     }
   }
 
-  private Pair<Integer, Integer> deocodeActiveLogCounter(long value) {
-    int fileId = (int) (value >>> 32);
-    int count = (int) value;
-
-    return Pair.of(fileId, count);
-  }
-
-  private long encodeActiveLogCounter(int fileId, int count) {
-    long result = fileId;
-    result = (long)fileId << 32;
-    result += (long) count;
-    return result;
-  }
-
   SetMultimap<Long, Long> deserializeInflightPuts() throws IOException{
     return inflightPuts.deserialize();
   }
@@ -209,46 +88,20 @@ final class FlumeEventQueue {
   }
 
   synchronized long getLogWriteOrderID() {
-    return logWriteOrderID;
+    return backingStore.getLogWriteOrderID();
   }
 
   synchronized boolean checkpoint(boolean force) throws Exception {
-    if (!elements.syncRequired()
+    if (!backingStore.syncRequired()
             && !inflightTakes.syncRequired()
             && !force) { //No need to check inflight puts, since that would
                          //cause elements.syncRequired() to return true.
       LOG.debug("Checkpoint not required");
       return false;
     }
-
-    // Start checkpoint
-    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
-
-    updateHeaders();
-
-    List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
-    for (Integer fileId : fileIDCounts.keySet()) {
-      Integer count = fileIDCounts.get(fileId).get();
-      long value = encodeActiveLogCounter(fileId, count);
-      fileIdAndCountEncoded.add(value);
-    }
-
-    int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
-    for (int i = 0; i < emptySlots; i++)  {
-      fileIdAndCountEncoded.add(0L);
-    }
-    for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
-      elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
-    }
-
-    elements.sync();
-
     inflightPuts.serializeAndWrite();
     inflightTakes.serializeAndWrite();
-    // Finish checkpoint
-    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
-    mappedBuffer.force();
-
+    backingStore.checkpoint();
     return true;
   }
 
@@ -258,7 +111,7 @@ final class FlumeEventQueue {
    * @return FlumeEventPointer or null if queue is empty
    */
   synchronized FlumeEventPointer removeHead(long transactionID) {
-    if(queueSize  == 0) {
+    if(backingStore.getSize()  == 0) {
       return null;
     }
 
@@ -267,7 +120,7 @@ final class FlumeEventQueue {
           + channelNameDescriptor);
 
     FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
-    decrementFileID(ptr.getFileID());
+    backingStore.decrementFileID(ptr.getFileID());
     return ptr;
   }
 
@@ -284,7 +137,7 @@ final class FlumeEventQueue {
     //events since they are in the inflight takes. So puts will not happen
     //in such a way that these takes cannot go back in. If this if returns true,
     //there is a buuuuuuuug!
-    if (queueSize == queueCapacity) {
+    if (backingStore.getSize() == backingStore.getCapacity()) {
       LOG.error("Could not reinsert to queue, events which were taken but "
               + "not committed. Please report this issue.");
       return false;
@@ -292,7 +145,7 @@ final class FlumeEventQueue {
 
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    incrementFileID(e.getFileID());
+    backingStore.incrementFileID(e.getFileID());
 
     add(0, value);
     return true;
@@ -306,15 +159,15 @@ final class FlumeEventQueue {
    * was added to the queue
    */
   synchronized boolean addTail(FlumeEventPointer e) {
-    if ((queueSize + inflightTakes.getSize()) == queueCapacity) {
+    if (getSize() == backingStore.getCapacity()) {
       return false;
     }
 
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    incrementFileID(e.getFileID());
+    backingStore.incrementFileID(e.getFileID());
 
-    add(queueSize, value);
+    add(backingStore.getSize(), value);
     return true;
   }
 
@@ -338,11 +191,11 @@ final class FlumeEventQueue {
   synchronized boolean remove(FlumeEventPointer e) {
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
-    for (int i = 0; i < queueSize; i++) {
+    for (int i = 0; i < backingStore.getSize(); i++) {
       if(get(i) == value) {
         remove(i, 0);
         FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
-        decrementFileID(ptr.getFileID());
+        backingStore.decrementFileID(ptr.getFileID());
         return true;
       }
     }
@@ -357,75 +210,53 @@ final class FlumeEventQueue {
     //Java implements clone pretty well. The main place this is used
     //in checkpointing and deleting old files, so best
     //to use a sorted set implementation.
-    SortedSet<Integer> fileIDs = new TreeSet(fileIDCounts.keySet());
+    SortedSet<Integer> fileIDs =
+        new TreeSet<Integer>(backingStore.getReferenceCounts());
     fileIDs.addAll(inflightPuts.getFileIDs());
     fileIDs.addAll(inflightTakes.getFileIDs());
     return fileIDs;
   }
 
-  protected void incrementFileID(int fileID) {
-    AtomicInteger counter = fileIDCounts.get(fileID);
-    if(counter == null) {
-      Preconditions.checkState(fileIDCounts.size() < MAX_ACTIVE_LOGS,
-          "Too many active logs " + channelNameDescriptor);
-      counter = new AtomicInteger(0);
-      fileIDCounts.put(fileID, counter);
-    }
-    counter.incrementAndGet();
-  }
-
-  protected void decrementFileID(int fileID) {
-    AtomicInteger counter = fileIDCounts.get(fileID);
-    Preconditions.checkState(counter != null, "null counter "
-        + channelNameDescriptor);
-    int count = counter.decrementAndGet();
-    if(count == 0) {
-      fileIDCounts.remove(fileID);
-    }
-  }
-
   protected long get(int index) {
-    if (index < 0 || index > queueSize - 1) {
+    if (index < 0 || index > backingStore.getSize() - 1) {
       throw new IndexOutOfBoundsException(String.valueOf(index)
           + channelNameDescriptor);
     }
-
-    return elements.get(getPhysicalIndex(index));
+    return backingStore.get(index);
   }
 
   private void set(int index, long value) {
-    if (index < 0 || index > queueSize - 1) {
+    if (index < 0 || index > backingStore.getSize() - 1) {
       throw new IndexOutOfBoundsException(String.valueOf(index)
           + channelNameDescriptor);
     }
-
-    elements.put(getPhysicalIndex(index), value);
+    backingStore.put(index, value);
   }
 
   protected boolean add(int index, long value) {
-    if (index < 0 || index > queueSize) {
+    if (index < 0 || index > backingStore.getSize()) {
       throw new IndexOutOfBoundsException(String.valueOf(index)
           + channelNameDescriptor);
     }
 
-    if (queueSize == queueCapacity) {
+    if (backingStore.getSize() == backingStore.getCapacity()) {
       return false;
     }
 
-    queueSize++;
+    backingStore.setSize(backingStore.getSize() + 1);
 
-    if (index <= queueSize/2) {
+    if (index <= backingStore.getSize()/2) {
       // Shift left
-      queueHead--;
-      if (queueHead < 0) {
-        queueHead = queueCapacity - 1;
+      backingStore.setHead(backingStore.getHead() - 1);
+      if (backingStore.getHead() < 0) {
+        backingStore.setHead(backingStore.getCapacity() - 1);
       }
       for (int i = 0; i < index; i++) {
         set(i, get(i+1));
       }
     } else {
       // Sift right
-      for (int i = queueSize - 1; i > index; i--) {
+      for (int i = backingStore.getSize() - 1; i > index; i--) {
         set(i, get(i-1));
       }
     }
@@ -444,22 +275,22 @@ final class FlumeEventQueue {
   }
 
   protected synchronized long remove(int index, long transactionID) {
-    if (index < 0 || index > queueSize - 1) {
+    if (index < 0 || index > backingStore.getSize() - 1) {
       throw new IndexOutOfBoundsException("index = " + index
-          + ", queueSize " + queueSize +" " + channelNameDescriptor);
+          + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor);
     }
     long value = get(index);
     //if txn id = 0, we are recovering from a crash.
     if(transactionID != 0) {
       inflightTakes.addEvent(transactionID, value);
     }
-    if (index > queueSize/2) {
+    if (index > backingStore.getSize()/2) {
       // Move tail part to left
-      for (int i = index; i < queueSize - 1; i++) {
+      for (int i = index; i < backingStore.getSize() - 1; i++) {
         long rightValue = get(i+1);
         set(i, rightValue);
       }
-      set(queueSize - 1, EMPTY);
+      set(backingStore.getSize() - 1, EMPTY);
     } else {
       // Move head part to right
       for (int i = index - 1; i >= 0; i--) {
@@ -467,88 +298,34 @@ final class FlumeEventQueue {
         set(i+1, leftValue);
       }
       set(0, EMPTY);
-      queueHead++;
-      if (queueHead == queueCapacity) {
-        queueHead = 0;
+      backingStore.setHead(backingStore.getHead() + 1);
+      if (backingStore.getHead() == backingStore.getCapacity()) {
+        backingStore.setHead(0);
       }
     }
-
-    queueSize--;
+    backingStore.setSize(backingStore.getSize() - 1);
     return value;
   }
 
-  private synchronized void updateHeaders() {
-    logWriteOrderID = WriteOrderOracle.next();
-    elementsBuffer.put(INDEX_WRITE_ORDER_ID, logWriteOrderID);
-    elementsBuffer.put(INDEX_SIZE, queueSize);
-    elementsBuffer.put(INDEX_HEAD, queueHead);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Updating checkpoint headers: ts: " + logWriteOrderID + ", queueSize: "
-          + queueSize + ", queueHead: " + queueHead + " " + channelNameDescriptor);
-    }
-  }
-
-
-  private int getPhysicalIndex(int index) {
-    return HEADER_SIZE + (queueHead + index) % queueCapacity;
-  }
 
   protected synchronized int getSize() {
-    return queueSize + inflightTakes.getSize();
+    return backingStore.getSize() + inflightTakes.getSize();
   }
 
   /**
    * @return max capacity of the queue
    */
   public int getCapacity() {
-    return queueCapacity;
+    return backingStore.getCapacity();
   }
 
-  static class LongBufferWrapper {
-    private final LongBuffer buffer;
-    private final String channelNameDescriptor;
-
-    Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
-
-    LongBufferWrapper(LongBuffer lb, String nameDescriptor) {
-      buffer = lb;
-      channelNameDescriptor = nameDescriptor;
-    }
-
-    long get(int index) {
-      long result = EMPTY;
-      if (overwriteMap.containsKey(index)) {
-        result = overwriteMap.get(index);
-      } else {
-        result = buffer.get(index);
-      }
-
-      return result;
-    }
-
-    void put(int index, long value) {
-      overwriteMap.put(index, value);
-    }
-
-    boolean syncRequired() {
-      return overwriteMap.size() > 0;
-    }
-
-    void sync() {
-      Iterator<Integer> it = overwriteMap.keySet().iterator();
-      while (it.hasNext()) {
-        int index = it.next();
-        long value = overwriteMap.get(index);
-
-        buffer.put(index, value);
-        it.remove();
-      }
-
-      Preconditions.checkState(overwriteMap.size() == 0,
-          "concurrent update detected " + channelNameDescriptor);
+  synchronized void close() {
+    try {
+      backingStore.close();
+    } catch (IOException e) {
+      LOG.warn("Error closing backing store", e);
     }
   }
-
   /**
    * A representation of in flight events which have not yet been committed.
    * None of the methods are thread safe, and should be called from thread
@@ -740,56 +517,4 @@ final class FlumeEventQueue {
       return inflightFileIDs.values();
     }
   }
-
-  public static void main(String[] args) throws Exception {
-    File file = new File(args[0]);
-    File inflightTakesFile = new File(args[1]);
-    File inflightPutsFile = new File(args[2]);
-    if (!file.exists()) {
-      throw new IOException("File " + file + " does not exist");
-    }
-    if (file.length() == 0) {
-      throw new IOException("File " + file + " is empty");
-    }
-    int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
-    FlumeEventQueue queue = new FlumeEventQueue(
-            capacity, file, inflightTakesFile, inflightPutsFile, "debug");
-    System.out.println("File Reference Counts" + queue.fileIDCounts);
-    System.out.println("Queue Capacity " + queue.getCapacity());
-    System.out.println("Queue Size " + queue.getSize());
-    System.out.println("Queue Head " + queue.queueHead);
-    for (int index = 0; index < queue.getCapacity(); index++) {
-      long value = queue.elements.get(queue.getPhysicalIndex(index));
-      int fileID = (int) (value >>> 32);
-      int offset = (int) value;
-      System.out.println(index + ":" + Long.toHexString(value) + " fileID = "
-              + fileID + ", offset = " + offset);
-    }
-
-    SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts();
-    System.out.println("Inflight Puts:");
-
-    for (Long txnID : putMap.keySet()) {
-      Set<Long> puts = putMap.get(txnID);
-      System.out.println("Transaction ID: " + String.valueOf(txnID));
-      for (long value : puts) {
-        int fileID = (int) (value >>> 32);
-        int offset = (int) value;
-        System.out.println(Long.toHexString(value) + " fileID = "
-                + fileID + ", offset = " + offset);
-      }
-    }
-    SetMultimap<Long, Long> takeMap = queue.deserializeInflightTakes();
-    System.out.println("Inflight takes:");
-    for (Long txnID : takeMap.keySet()) {
-      Set<Long> takes = takeMap.get(txnID);
-      System.out.println("Transaction ID: " + String.valueOf(txnID));
-      for (long value : takes) {
-        int fileID = (int) (value >>> 32);
-        int offset = (int) value;
-        System.out.println(Long.toHexString(value) + " fileID = "
-                + fileID + ", offset = " + offset);
-      }
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/2b26f364/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index e13ecc4..6e8e3d0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +47,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.util.SortedSet;
 
 /**
  * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
@@ -91,7 +91,6 @@ class Log {
    */
   private final WriteLock checkpointWriterLock = checkpointLock.writeLock();
   private int logWriteTimeout;
-  private final String channelName;
   private final String channelNameDescriptor;
   private int checkpointWriteTimeout;
   private boolean useLogReplayV1;
@@ -184,7 +183,6 @@ class Log {
     Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
             "channel name should be specified");
 
-    this.channelName = name;
     this.channelNameDescriptor = "[channel=" + name + "]";
     this.useLogReplayV1 = useLogReplayV1;
     this.useFastReplay = useFastReplay;
@@ -225,6 +223,7 @@ class Log {
    * directly before the shutdown or crash.
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log has been opened");
 
@@ -248,7 +247,7 @@ class Log {
           int id = LogUtils.getIDForFile(file);
           dataFiles.add(file);
           nextFileID.set(Math.max(nextFileID.get(), id));
-          idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX
+          idLogFileMap.put(id, LogFileFactory.getRandomReader(new File(logDir, PREFIX
               + id)));
         }
       }
@@ -268,8 +267,11 @@ class Log {
       File checkpointFile = new File(checkpointDir, "checkpoint");
       File inflightTakesFile = new File(checkpointDir, "inflighttakes");
       File inflightPutsFile = new File(checkpointDir, "inflightputs");
-      queue = new FlumeEventQueue(queueCapacity, checkpointFile,
-              inflightTakesFile, inflightPutsFile, channelName);
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
+              channelNameDescriptor);
+      queue = new FlumeEventQueue(backingStore, inflightTakesFile,
+            inflightPutsFile);
       LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
         + ", queue depth = " + queue.getSize());
 
@@ -279,7 +281,7 @@ class Log {
        * the list of data files.
        */
       ReplayHandler replayHandler = new ReplayHandler(queue, useFastReplay,
-              checkpointFile, maxFileSize);
+              checkpointFile);
       if(useLogReplayV1) {
         LOGGER.info("Replaying logs with v1 replay logic");
         replayHandler.replayLogv1(dataFiles);
@@ -288,7 +290,6 @@ class Log {
         replayHandler.replayLog(dataFiles);
       }
 
-
       for (int index = 0; index < logDirs.length; index++) {
         LOGGER.info("Rolling " + logDirs[index]);
         roll(index);
@@ -354,8 +355,7 @@ class Log {
     Preconditions.checkState(open, "Log is closed");
     FlumeEvent flumeEvent = new FlumeEvent(
         event.getHeaders(), event.getBody());
-    Put put = new Put(transactionID, flumeEvent);
-    put.setLogWriteOrderID(WriteOrderOracle.next());
+    Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);
     int logFileIndex = nextLogWriter(transactionID);
     if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
@@ -384,9 +384,8 @@ class Log {
   void take(long transactionID, FlumeEventPointer pointer)
       throws IOException {
     Preconditions.checkState(open, "Log is closed");
-    Take take = new Take(transactionID, pointer.getOffset(),
-        pointer.getFileID());
-    take.setLogWriteOrderID(WriteOrderOracle.next());
+    Take take = new Take(transactionID, WriteOrderOracle.next(),
+        pointer.getOffset(), pointer.getFileID());
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take);
     int logFileIndex = nextLogWriter(transactionID);
     if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
@@ -416,8 +415,7 @@ class Log {
     if(LOGGER.isDebugEnabled()) {
       LOGGER.debug("Rolling back " + transactionID);
     }
-    Rollback rollback = new Rollback(transactionID);
-    rollback.setLogWriteOrderID(WriteOrderOracle.next());
+    Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next());
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback);
     int logFileIndex = nextLogWriter(transactionID);
     if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
@@ -526,6 +524,7 @@ class Log {
           }
         }
       }
+      queue.close();
       try {
         unlock(checkpointDir);
       } catch (IOException ex) {
@@ -564,8 +563,7 @@ class Log {
   private void commit(long transactionID, short type) throws IOException {
 
     Preconditions.checkState(open, "Log is closed");
-    Commit commit = new Commit(transactionID, type);
-    commit.setLogWriteOrderID(WriteOrderOracle.next());
+    Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);
     ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);
     int logFileIndex = nextLogWriter(transactionID);
     if (logFiles.get(logFileIndex).isRollRequired(buffer)) {
@@ -634,9 +632,9 @@ class Log {
               "File already exists "  + file);
           Preconditions.checkState(file.createNewFile(),
               "File could not be created " + file);
-          idLogFileMap.put(fileID, new LogFile.RandomReader(file));
+          idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file));
           // writer from this point on will get new reference
-          logFiles.set(index, new LogFile.Writer(file, fileID, maxFileSize));
+          logFiles.set(index, LogFileFactory.getWriter(file, fileID, maxFileSize));
           // close out old log
           if (oldLogFile != null) {
             oldLogFile.close();
@@ -670,9 +668,9 @@ class Log {
     if(!lockAcquired) {
       return false;
     }
-    SortedSet<Integer> idSet = null;
+    SortedSet<Integer> logFileRefCountsAll = null, logFileRefCountsActive = null;
     try {
-      if (queue.checkpoint(force) || force) {
+      if (queue.checkpoint(force)) {
         long logWriteOrderID = queue.getLogWriteOrderID();
 
         //Since the active files might also be in the queue's fileIDs,
@@ -681,42 +679,52 @@ class Log {
         //fileID set from the queue have been updated.
         //Since clone is smarter than insert, better to make
         //a copy of the set first so that we can use it later.
-        idSet = queue.getFileIDs();
-        SortedSet<Integer> idSetToCompare = new TreeSet<Integer>(idSet);
+        logFileRefCountsAll = queue.getFileIDs();
+        logFileRefCountsActive = new TreeSet<Integer>(logFileRefCountsAll);
 
         int numFiles = logFiles.length();
         for (int i = 0; i < numFiles; i++) {
-          LogFile.Writer writer = logFiles.get(i);
-          writer.markCheckpoint(logWriteOrderID);
-          int id = writer.getFileID();
-          idSet.remove(id);
-          LOGGER.debug("Updated checkpoint for file: " + writer.getFile());
+          LogFile.Writer logWriter = logFiles.get(i);
+          int logFileID = logWriter.getLogFileID();
+          File logFile = logWriter.getFile();
+          LogFile.MetaDataWriter writer =
+              LogFileFactory.getMetaDataWriter(logFile, logFileID);
+          try {
+            writer.markCheckpoint(logWriter.position(), logWriteOrderID);
+          } finally {
+            writer.close();
+          }
+          logFileRefCountsAll.remove(logFileID);
+          LOGGER.info("Updated checkpoint for file: " + logFile + " position: "
+              + logWriter.position() + " logWriteOrderID: " + logWriteOrderID);
         }
 
         // Update any inactive data files as well
-        Iterator<Integer> idIterator = idSet.iterator();
+        Iterator<Integer> idIterator = logFileRefCountsAll.iterator();
         while (idIterator.hasNext()) {
           int id = idIterator.next();
           LogFile.RandomReader reader = idLogFileMap.remove(id);
           File file = reader.getFile();
           reader.close();
-          // Open writer in inactive mode
-          LogFile.Writer writer =
-              new LogFile.Writer(file, id, maxFileSize, false);
-          writer.markCheckpoint(logWriteOrderID);
-          writer.close();
-          reader = new LogFile.RandomReader(file);
+          LogFile.MetaDataWriter writer =
+              LogFileFactory.getMetaDataWriter(file, id);
+          try {
+            writer.markCheckpoint(logWriteOrderID);
+          } finally {
+            writer.close();
+          }
+          reader = LogFileFactory.getRandomReader(file);
           idLogFileMap.put(id, reader);
-          LOGGER.debug("Updated checkpoint for file: " + file);
+          LOGGER.debug("Updated checkpoint for file: " + file
+              + "logWriteOrderID " + logWriteOrderID);
           idIterator.remove();
         }
-        Preconditions.checkState(idSet.size() == 0,
-                "Could not update all data file timestamps: " + idSet);
+        Preconditions.checkState(logFileRefCountsAll.size() == 0,
+                "Could not update all data file timestamps: " + logFileRefCountsAll);
         //Add files from all log directories
         for (int index = 0; index < logDirs.length; index++) {
-          idSetToCompare.add(logFiles.get(index).getFileID());
+          logFileRefCountsActive.add(logFiles.get(index).getLogFileID());
         }
-        idSet = idSetToCompare;
         checkpointCompleted = true;
       }
     } finally {
@@ -725,7 +733,7 @@ class Log {
     //Do the deletes outside the checkpointWriterLock
     //Delete logic is expensive.
     if (open && checkpointCompleted) {
-      removeOldLogs(idSet);
+      removeOldLogs(logFileRefCountsActive);
     }
     //Since the exception is not caught, this will not be returned if
     //an exception is thrown from the try.
@@ -755,6 +763,11 @@ class Log {
           LOGGER.info("Removing old log " + logFile +
               ", result = " + logFile.delete() + ", minFileID "
               + minFileID);
+          File metaDataFile = Serialization.getMetaDataFile(logFile);
+          if(metaDataFile.exists() && !metaDataFile.delete()) {
+            LOGGER.warn("Could not remove metadata file "
+                + metaDataFile + " for " + logFile);
+          }
         }
       }
     }
@@ -795,6 +808,7 @@ class Log {
    * <code>null</code> if directory is already locked.
    * @throws IOException if locking fails.
    */
+  @SuppressWarnings("resource")
   private FileLock tryLock(File dir) throws IOException {
     File lockF = new File(dir, FILE_LOCK);
     lockF.deleteOnExit();


Mime
View raw message