flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [9/9] git commit: FLUME-1437: Checkpoint can miss pending takes
Date Fri, 24 Aug 2012 17:23:23 GMT
FLUME-1437: Checkpoint can miss pending takes

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cf3c16f4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cf3c16f4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cf3c16f4

Branch: refs/heads/flume-1.3.0
Commit: cf3c16f4b08663d408f01506015e1cf5be6c3d73
Parents: ca636a3
Author: Brock Noland <brock@apache.org>
Authored: Tue Aug 21 09:34:22 2012 -0500
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Aug 24 10:20:32 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |   73 ++-
 .../apache/flume/channel/file/FlumeEventQueue.java |  326 +++++++++++-
 .../java/org/apache/flume/channel/file/Log.java    |   12 +-
 .../apache/flume/channel/file/ReplayHandler.java   |   82 +++-
 .../apache/flume/channel/file/TestCheckpoint.java  |   17 +-
 .../apache/flume/channel/file/TestFileChannel.java |  401 +++++++++++++--
 .../flume/channel/file/TestFlumeEventQueue.java    |  108 +++-
 .../org/apache/flume/channel/file/TestLog.java     |   14 +-
 8 files changed, 908 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index e7735e8..b5a0b88 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -256,6 +256,7 @@ public class FileChannel extends BasicChannelSemantics {
     super.stop();
   }
 
+  @Override
   public String toString() {
     return "FileChannel " + getName() + " { dataDirs: " +
         Arrays.toString(dataDirs) + " }";
@@ -362,7 +363,7 @@ public class FileChannel extends BasicChannelSemantics {
             "increasing capacity, or increasing thread count. "
                + channelNameDescriptor);
       }
-      FlumeEventPointer ptr = queue.removeHead();
+      FlumeEventPointer ptr = queue.removeHead(transactionID);
       if(ptr != null) {
         try {
           // first add to takeList so that if write to disk
@@ -387,6 +388,23 @@ public class FileChannel extends BasicChannelSemantics {
       if(puts > 0) {
         Preconditions.checkState(takes == 0, "nonzero puts and takes "
                 + channelNameDescriptor);
+        /*
+         * OK to not put this in synchronized(queue) block, because if a
+         * checkpoint occurs after the commit it is fine.
+         * The puts will be in the inflightputs file in the checkpoint.
+         * The commit did not return, so previous hop would not get success
+         * for the commit.
+         * The replay will not see the commit in the log file(since the
+         * commit is before the checkpoint in the logs) - and hence the events
+         * are not added back to the queue, so no duplicates or data loss.
+         */
+        try {
+          log.commitPut(transactionID);
+          channelCounter.addToEventPutSuccessCount(puts);
+        } catch (IOException e) {
+          throw new ChannelException("Commit failed due to IO error "
+                  + channelNameDescriptor, e);
+        }
         synchronized (queue) {
           while(!putList.isEmpty()) {
             if(!queue.addTail(putList.removeFirst())) {
@@ -401,21 +419,24 @@ public class FileChannel extends BasicChannelSemantics {
               Preconditions.checkState(false, msg.toString());
             }
           }
+          queue.completeTransaction(transactionID);
         }
+      } else if (takes > 0) {
         try {
-          log.commitPut(transactionID);
-          channelCounter.addToEventPutSuccessCount(puts);
-        } catch (IOException e) {
-          throw new ChannelException("Commit failed due to IO error "
-              + channelNameDescriptor, e);
-        }
-      } else if(takes > 0) {
-        try {
+          /*
+           * OK to not have the commit take in synchronized(queue) block.
+           * If a checkpoint happens in between the commitTake and
+           * the completeTxn call, the takes will be in the inflightTakes file.
+           * When the channel replays the events, these takes will be put
+           * back into the channel - and will cause duplicates, but the
+           * number of duplicates will be pretty limited.
+           */
           log.commitTake(transactionID);
+          queue.completeTransaction(transactionID);
           channelCounter.addToEventTakeSuccessCount(takes);
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
-               + channelNameDescriptor, e);
+                  + channelNameDescriptor, e);
         }
         queueRemaining.release(takes);
       }
@@ -428,26 +449,36 @@ public class FileChannel extends BasicChannelSemantics {
     protected void doRollback() throws InterruptedException {
       int puts = putList.size();
       int takes = takeList.size();
+      /*
+       * OK to not have the rollback within the synchronized(queue) block.
+       * If a checkpoint occurs between the rollback and the synchronized(queue)
+       * block, the takes are kept in the inflighttakes file in the checkpoint.
+       * During a replay the commit or rollback for the takes are not seen,
+       * so the takes are re-inserted into the queue - which is a rollback
+       * anyway.
+       */
+      try {
+        log.rollback(transactionID);
+      } catch (IOException e) {
+        throw new ChannelException("Commit failed due to IO error "
+                + channelNameDescriptor, e);
+      }
       if(takes > 0) {
         Preconditions.checkState(puts == 0, "nonzero puts and takes "
             + channelNameDescriptor);
-        while(!takeList.isEmpty()) {
-          Preconditions.checkState(queue.addHead(takeList.removeLast()),
-              "Queue add failed, this shouldn't be able to happen "
-                   + channelNameDescriptor);
+        synchronized (queue) {
+          while (!takeList.isEmpty()) {
+            Preconditions.checkState(queue.addHead(takeList.removeLast()),
+                    "Queue add failed, this shouldn't be able to happen "
+                    + channelNameDescriptor);
+          }
+          queue.completeTransaction(transactionID);
         }
       }
       queueRemaining.release(puts);
-      try {
-        log.rollback(transactionID);
-      } catch (IOException e) {
-        throw new ChannelException("Commit failed due to IO error "
-             + channelNameDescriptor, e);
-      }
       putList.clear();
       takeList.clear();
       channelCounter.setChannelSize(queue.getSize());
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 9bfee2d..766c59a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -35,9 +35,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.ArrayUtils;
 
 /**
  * Queue of events in the channel. This queue stores only
@@ -48,7 +60,7 @@ import java.util.TreeSet;
  * contains the timestamp of last sync, the queue size and
  * the head position.
  */
-class FlumeEventQueue {
+final class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
   .getLogger(FlumeEventQueue.class);
   private static final long VERSION = 2;
@@ -72,6 +84,8 @@ class FlumeEventQueue {
   private final java.nio.channels.FileChannel checkpointFileHandle;
   private final int queueCapacity;
   private final String channelNameDescriptor;
+  private final InflightEventWrapper inflightTakes;
+  private final InflightEventWrapper inflightPuts;
 
   private int queueSize;
   private int queueHead;
@@ -81,7 +95,8 @@ class FlumeEventQueue {
    * @param capacity max event capacity of queue
    * @throws IOException
    */
-  FlumeEventQueue(int capacity, File file, String name) throws IOException {
+  FlumeEventQueue(int capacity, File file, File inflightTakesFile,
+          File inflightPutsFile, String name) throws Exception {
     Preconditions.checkArgument(capacity > 0,
         "Capacity must be greater than zero");
     this.channelNameDescriptor = "[channel=" + name + "]";
@@ -161,6 +176,14 @@ class FlumeEventQueue {
     }
 
     elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor);
+    //TODO: Support old code paths with no inflight files.
+    try {
+      inflightPuts = new InflightEventWrapper(inflightPutsFile);
+      inflightTakes = new InflightEventWrapper(inflightTakesFile);
+    } catch (Exception e) {
+      LOG.error("Could not read checkpoint.", e);
+      throw e;
+    }
   }
 
   private Pair<Integer, Integer> deocodeActiveLogCounter(long value) {
@@ -177,12 +200,23 @@ class FlumeEventQueue {
     return result;
   }
 
+  SetMultimap<Long, Long> deserializeInflightPuts() throws IOException{
+    return inflightPuts.deserialize();
+  }
+
+  SetMultimap<Long, Long> deserializeInflightTakes() throws IOException{
+    return inflightTakes.deserialize();
+  }
+
   synchronized long getLogWriteOrderID() {
     return logWriteOrderID;
   }
 
-  synchronized boolean checkpoint(boolean force) {
-    if (!elements.syncRequired() && !force) {
+  synchronized boolean checkpoint(boolean force) throws Exception {
+    if (!elements.syncRequired()
+            && !inflightTakes.syncRequired()
+            && !force) { //No need to check inflight puts, since that would
+                         //cause elements.syncRequired() to return true.
       LOG.debug("Checkpoint not required");
       return false;
     }
@@ -209,6 +243,8 @@ class FlumeEventQueue {
 
     elements.sync();
 
+    inflightPuts.serializeAndWrite();
+    inflightTakes.serializeAndWrite();
     // Finish checkpoint
     elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
     mappedBuffer.force();
@@ -221,12 +257,12 @@ class FlumeEventQueue {
    *
    * @return FlumeEventPointer or null if queue is empty
    */
-  synchronized FlumeEventPointer removeHead() {
-    if(queueSize == 0) {
+  synchronized FlumeEventPointer removeHead(long transactionID) {
+    if(queueSize  == 0) {
       return null;
     }
 
-    long value = remove(0);
+    long value = remove(0, transactionID);
     Preconditions.checkState(value != EMPTY, "Empty value "
           + channelNameDescriptor);
 
@@ -236,13 +272,21 @@ class FlumeEventQueue {
   }
 
   /**
-   * Add a FlumeEventPointer to the head of the queue
+   * Add a FlumeEventPointer to the head of the queue.
+   * Called during rollbacks.
    * @param FlumeEventPointer to be added
    * @return true if space was available and pointer was
    * added to the queue
    */
   synchronized boolean addHead(FlumeEventPointer e) {
+    //Called only during rollback, so should not consider inflight takes' size,
+    //because normal puts through addTail method already account for these
+    //events since they are in the inflight takes. So puts will not happen
+    //in such a way that these takes cannot go back in. If this if returns true,
+    //there is a buuuuuuuug!
     if (queueSize == queueCapacity) {
+      LOG.error("Could not reinsert to queue, events which were taken but "
+              + "not committed. Please report this issue.");
       return false;
     }
 
@@ -256,15 +300,13 @@ class FlumeEventQueue {
 
 
   /**
-   * Add a FlumeEventPointer to the tail of the queue
-   * this will normally be used when recovering from a
-   * crash
+   * Add a FlumeEventPointer to the tail of the queue.
    * @param FlumeEventPointer to be added
    * @return true if space was available and pointer
    * was added to the queue
    */
   synchronized boolean addTail(FlumeEventPointer e) {
-    if (queueSize == queueCapacity) {
+    if ((queueSize + inflightTakes.getSize()) == queueCapacity) {
       return false;
     }
 
@@ -277,6 +319,16 @@ class FlumeEventQueue {
   }
 
   /**
+   * Must be called when a put happens to the log. This ensures that put commits
+   * after checkpoints will retrieve all events committed in that txn.
+   * @param e
+   * @param transactionID
+   */
+  synchronized void addWithoutCommit(FlumeEventPointer e, long transactionID) {
+    inflightPuts.addEvent(transactionID, e.toLong());
+  }
+
+  /**
    * Remove FlumeEventPointer from queue, will normally
    * only be used when recovering from a crash
    * @param FlumeEventPointer to be removed
@@ -288,7 +340,7 @@ class FlumeEventQueue {
     Preconditions.checkArgument(value != EMPTY);
     for (int i = 0; i < queueSize; i++) {
       if(get(i) == value) {
-        remove(i);
+        remove(i, 0);
         FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
         decrementFileID(ptr.getFileID());
         return true;
@@ -378,13 +430,26 @@ class FlumeEventQueue {
     return true;
   }
 
-  protected synchronized long remove(int index) {
+  /**
+   * Must be called when a transaction is being committed or rolled back.
+   * @param transactionID
+   */
+  synchronized void completeTransaction(long transactionID) {
+    if (!inflightPuts.completeTransaction(transactionID)) {
+      inflightTakes.completeTransaction(transactionID);
+    }
+  }
+
+  protected synchronized long remove(int index, long transactionID) {
     if (index < 0 || index > queueSize - 1) {
       throw new IndexOutOfBoundsException("index = " + index
           + ", queueSize " + queueSize +" " + channelNameDescriptor);
     }
     long value = get(index);
-
+    //if txn id = 0, we are recovering from a crash.
+    if(transactionID != 0) {
+      inflightTakes.addEvent(transactionID, value);
+    }
     if (index > queueSize/2) {
       // Move tail part to left
       for (int i = index; i < queueSize - 1; i++) {
@@ -426,7 +491,7 @@ class FlumeEventQueue {
   }
 
   protected synchronized int getSize() {
-    return queueSize;
+    return queueSize + inflightTakes.getSize();
   }
 
   /**
@@ -481,26 +546,239 @@ class FlumeEventQueue {
     }
   }
 
-  public static void main(String[] args) throws IOException {
+  /**
+   * A representation of in flight events which have not yet been committed.
+   * None of the methods are thread safe, and should be called from thread
+   * safe methods only.
+   */
+  private class InflightEventWrapper {
+    private SetMultimap<Long, Long> inflightEvents = HashMultimap.create();
+    private RandomAccessFile file;
+    private volatile java.nio.channels.FileChannel fileChannel;
+    private final MessageDigest digest;
+    private volatile Future<?> future;
+    private final File inflightEventsFile;
+    private volatile boolean syncRequired = false;
+
+    public InflightEventWrapper(File inflightEventsFile) throws Exception{
+      if(!inflightEventsFile.exists()){
+        Preconditions.checkState(inflightEventsFile.createNewFile(),"Could not"
+                + "create inflight events file: "
+                + inflightEventsFile.getCanonicalPath());
+      }
+      this.inflightEventsFile = inflightEventsFile;
+      file = new RandomAccessFile(inflightEventsFile, "rw");
+      fileChannel = file.getChannel();
+      digest = MessageDigest.getInstance("MD5");
+    }
+
+    /**
+     * Complete the transaction, and remove all events from inflight list.
+     * @param transactionID
+     */
+    public boolean completeTransaction(Long transactionID) {
+      if(!inflightEvents.containsKey(transactionID)) {
+        return false;
+      }
+      inflightEvents.removeAll(transactionID);
+      syncRequired = true;
+      return true;
+    }
+
+    /**
+     * Add an event pointer to the inflights list.
+     * @param transactionID
+     * @param pointer
+     */
+    public void addEvent(Long transactionID, Long pointer){
+      inflightEvents.put(transactionID, pointer);
+      syncRequired = true;
+    }
+
+    /**
+     * Serialize the set of in flights into a byte longBuffer.
+     * @return Returns the checksum of the buffer that is being
+     * asynchronously written to disk.
+     */
+    public void serializeAndWrite() throws Exception {
+      //Check if there is a current write happening, if there is abort it.
+      if (future != null) {
+        try {
+          future.cancel(true);
+        } catch (Exception e) {
+          LOG.warn("Interrupted a write to inflights "
+                  + "file: " + inflightEventsFile.getName()
+                  + " to start a new write.");
+        }
+        while (!future.isDone()) {
+          TimeUnit.MILLISECONDS.sleep(100);
+        }
+      }
+      Collection<Long> values = inflightEvents.values();
+      if(values.isEmpty()){
+        file.setLength(0L);
+      }
+      if(!fileChannel.isOpen()){
+        file = new RandomAccessFile(inflightEventsFile, "rw");
+        fileChannel = file.getChannel();
+      }
+      //What is written out?
+      //Checksum - 16 bytes
+      //and then each key-value pair from the map:
+      //transactionid numberofeventsforthistxn listofeventpointers
+
+      try {
+        int expectedFileSize = (((inflightEvents.keySet().size() * 2) //For transactionIDs and events per txn ID
+                + values.size()) * 8) //Event pointers
+                + 16; //Checksum
+        //There is no real need of filling the channel with 0s, since we
+        //will write the exact nummber of bytes as expected file size.
+        file.setLength(expectedFileSize);
+        Preconditions.checkState(file.length() == expectedFileSize,
+                "Expected File size of inflight events file does not match the "
+                + "current file size. Checkpoint is incomplete.");
+        file.seek(0);
+        final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize);
+        LongBuffer longBuffer = buffer.asLongBuffer();
+        for (Long txnID : inflightEvents.keySet()) {
+          Set<Long> pointers = inflightEvents.get(txnID);
+          longBuffer.put(txnID);
+          longBuffer.put((long) pointers.size());
+          LOG.debug("Number of events inserted into "
+                  + "inflights file: " + String.valueOf(pointers.size())
+                  + " file: " + inflightEventsFile.getCanonicalPath());
+          long[] written = ArrayUtils.toPrimitive(
+                  pointers.toArray(new Long[0]));
+          longBuffer.put(written);
+        }
+        byte[] checksum = digest.digest(buffer.array());
+        file.write(checksum);
+        future = Executors.newSingleThreadExecutor().submit(
+                new Runnable() {
+                  @Override
+                  public void run() {
+                    try {
+                      buffer.position(0);
+                      fileChannel.write(buffer);
+                      fileChannel.force(true);
+                    } catch (IOException ex) {
+                      LOG.error("Error while writing inflight events to "
+                              + "inflights file: "
+                              + inflightEventsFile.getName());
+                    }
+                  }
+                });
+        syncRequired = false;
+      } catch (IOException ex) {
+        LOG.error("Error while writing checkpoint to disk.", ex);
+        throw ex;
+      }
+    }
+
+    /**
+     * Read the inflights file and return a
+     * {@link com.google.common.collect.SetMultimap}
+     * of transactionIDs to events that were inflight.
+     *
+     * @return - map of inflight events per txnID.
+     *
+     */
+    public SetMultimap<Long, Long> deserialize() throws IOException {
+      SetMultimap<Long, Long> inflights = HashMultimap.create();
+      if (!fileChannel.isOpen()) {
+        file = new RandomAccessFile(inflightEventsFile, "rw");
+        fileChannel = file.getChannel();
+      }
+      if(file.length() == 0) {
+        return inflights;
+      }
+      file.seek(0);
+      byte[] checksum = new byte[16];
+      file.read(checksum);
+      ByteBuffer buffer = ByteBuffer.allocate(
+              (int)(file.length() - file.getFilePointer()));
+      fileChannel.read(buffer);
+      byte[] fileChecksum = digest.digest(buffer.array());
+      if (!Arrays.equals(checksum, fileChecksum)) {
+        throw new IllegalStateException("Checksum of inflights file differs"
+                + " from the checksum expected.");
+      }
+      buffer.position(0);
+      LongBuffer longBuffer = buffer.asLongBuffer();
+      try {
+        while (true) {
+          long txnID = longBuffer.get();
+          int numEvents = (int)(longBuffer.get());
+          for(int i = 0; i < numEvents; i++) {
+            long val = longBuffer.get();
+            inflights.put(txnID, val);
+          }
+        }
+      } catch (BufferUnderflowException ex) {
+        LOG.debug("Reached end of inflights buffer. Long buffer position ="
+                + String.valueOf(longBuffer.position()));
+      }
+      return  inflights;
+    }
+
+    public int getSize() {
+      return inflightEvents.size();
+    }
+
+    public boolean syncRequired(){
+      return syncRequired;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
     File file = new File(args[0]);
-    if(!file.exists()) {
+    File inflightTakesFile = new File(args[1]);
+    File inflightPutsFile = new File(args[2]);
+    if (!file.exists()) {
       throw new IOException("File " + file + " does not exist");
     }
-    if(file.length() == 0) {
+    if (file.length() == 0) {
       throw new IOException("File " + file + " is empty");
     }
-    int capacity = (int)((file.length() - (HEADER_SIZE * 8L)) / 8L);
-    FlumeEventQueue queue = new FlumeEventQueue(capacity, file, "debug");
+    int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
+    FlumeEventQueue queue = new FlumeEventQueue(
+            capacity, file, inflightTakesFile, inflightPutsFile, "debug");
     System.out.println("File Reference Counts" + queue.fileIDCounts);
     System.out.println("Queue Capacity " + queue.getCapacity());
     System.out.println("Queue Size " + queue.getSize());
     System.out.println("Queue Head " + queue.queueHead);
     for (int index = 0; index < queue.getCapacity(); index++) {
       long value = queue.elements.get(queue.getPhysicalIndex(index));
-      int fileID = (int)(value >>> 32);
-      int offset = (int)value;
+      int fileID = (int) (value >>> 32);
+      int offset = (int) value;
       System.out.println(index + ":" + Long.toHexString(value) + " fileID = "
-          + fileID + ", offset = " + offset);
+              + fileID + ", offset = " + offset);
+    }
+
+    SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts();
+    System.out.println("Inflight Puts:");
+
+    for (Long txnID : putMap.keySet()) {
+      Set<Long> puts = putMap.get(txnID);
+      System.out.println("Transaction ID: " + String.valueOf(txnID));
+      for (long value : puts) {
+        int fileID = (int) (value >>> 32);
+        int offset = (int) value;
+        System.out.println(Long.toHexString(value) + " fileID = "
+                + fileID + ", offset = " + offset);
+      }
+    }
+    SetMultimap<Long, Long> takeMap = queue.deserializeInflightTakes();
+    System.out.println("Inflight takes:");
+    for (Long txnID : takeMap.keySet()) {
+      Set<Long> takes = takeMap.get(txnID);
+      System.out.println("Transaction ID: " + String.valueOf(txnID));
+      for (long value : takes) {
+        int fileID = (int) (value >>> 32);
+        int offset = (int) value;
+        System.out.println(Long.toHexString(value) + " fileID = "
+                + fileID + ", offset = " + offset);
+      }
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 11f1e1f..c356ca4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -245,8 +245,10 @@ class Log {
        * locations. We will read the last one written to disk.
        */
       File checkpointFile = new File(checkpointDir, "checkpoint");
-      queue = new FlumeEventQueue(queueCapacity,
-                        checkpointFile, channelName);
+      File inflightTakesFile = new File(checkpointDir, "inflighttakes");
+      File inflightPutsFile = new File(checkpointDir, "inflightputs");
+      queue = new FlumeEventQueue(queueCapacity, checkpointFile,
+              inflightTakesFile, inflightPutsFile, channelName);
       LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
         + ", queue depth = " + queue.getSize());
 
@@ -378,6 +380,7 @@ class Log {
       boolean error = true;
       try {
         FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);
+        queue.addWithoutCommit(ptr, transactionID);
         error = false;
         return ptr;
       } finally {
@@ -704,7 +707,7 @@ class Log {
     }
   }
 
-  private boolean writeCheckpoint() throws IOException {
+  private boolean writeCheckpoint() throws Exception {
     return writeCheckpoint(false);
   }
 
@@ -718,8 +721,7 @@ class Log {
    * @param force  a flag to force the writing of checkpoint
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
-  private boolean writeCheckpoint(boolean force)
-      throws IOException {
+  private boolean writeCheckpoint(boolean force) throws Exception {
     boolean lockAcquired = false;
     boolean checkpointCompleted = false;
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index bbca62c..14e2ff7 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -26,14 +26,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
-import org.apache.commons.collections.MultiMap;
-import org.apache.commons.collections.map.MultiValueMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
 
 /**
  * Processes a set of data logs, replaying said logs into the queue.
@@ -78,6 +82,16 @@ class ReplayHandler {
     int total = 0;
     int count = 0;
     MultiMap transactionMap = new MultiValueMap();
+    //Read inflight puts to see if they were committed
+    SetMultimap<Long, Long> inflightPuts = queue.deserializeInflightPuts();
+    for (Long txnID : inflightPuts.keySet()) {
+      Set<Long> eventPointers = inflightPuts.get(txnID);
+      for (Long eventPointer : eventPointers) {
+        transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
+      }
+    }
+
+    SetMultimap<Long, Long> inflightTakes = queue.deserializeInflightTakes();
     LOG.info("Starting replay of " + logs);
     for (File log : logs) {
       LOG.info("Replaying " + log);
@@ -120,6 +134,20 @@ class ReplayHandler {
               @SuppressWarnings("unchecked")
               Collection<FlumeEventPointer> pointers =
                 (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+              if (((Commit) record).getType()
+                      == TransactionEventRecord.Type.TAKE.get()) {
+                if (inflightTakes.containsKey(trans)) {
+                  if (pointers == null) {
+                    pointers = Sets.newHashSet();
+                  }
+                  Set<Long> takes = inflightTakes.removeAll(trans);
+                  Iterator<Long> it = takes.iterator();
+                  while (it.hasNext()) {
+                    Long take = it.next();
+                    pointers.add(FlumeEventPointer.fromLong(take));
+                  }
+                }
+              }
               if (pointers != null && pointers.size() > 0) {
                 processCommit(((Commit) record).getType(), pointers);
                 count += pointers.size();
@@ -149,6 +177,19 @@ class ReplayHandler {
         }
       }
     }
+    //re-insert the events in the take map,
+    //since the takes were not committed.
+    int uncommittedTakes = 0;
+    for (Long inflightTxnId : inflightTakes.keySet()) {
+      Set<Long> inflightUncommittedTakes =
+              inflightTakes.get(inflightTxnId);
+      for (Long inflightUncommittedTake : inflightUncommittedTakes) {
+        queue.addHead(FlumeEventPointer.fromLong(inflightUncommittedTake));
+        uncommittedTakes++;
+      }
+    }
+    inflightTakes.clear();
+    count += uncommittedTakes;
     int pendingTakesSize = pendingTakes.size();
     if (pendingTakesSize > 0) {
       String msg = "Pending takes " + pendingTakesSize
@@ -173,6 +214,16 @@ class ReplayHandler {
     MultiMap transactionMap = new MultiValueMap();
     long transactionIDSeed = 0, writeOrderIDSeed = 0;
     LOG.info("Starting replay of " + logs);
+    //Load the inflight puts into the transaction map to see if they were
+    //committed in one of the logs.
+    SetMultimap<Long, Long> inflightPuts = queue.deserializeInflightPuts();
+    for (Long txnID : inflightPuts.keySet()) {
+      Set<Long> eventPointers = inflightPuts.get(txnID);
+      for (Long eventPointer : eventPointers) {
+        transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
+      }
+    }
+    SetMultimap<Long, Long> inflightTakes = queue.deserializeInflightTakes();
     try {
       for (File log : logs) {
         LOG.info("Replaying " + log);
@@ -232,6 +283,20 @@ class ReplayHandler {
             @SuppressWarnings("unchecked")
             Collection<FlumeEventPointer> pointers =
               (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+            if (((Commit) record).getType()
+                    == TransactionEventRecord.Type.TAKE.get()) {
+              if (inflightTakes.containsKey(trans)) {
+                if(pointers == null){
+                  pointers = Sets.newHashSet();
+                }
+                Set<Long> takes = inflightTakes.removeAll(trans);
+                Iterator<Long> it = takes.iterator();
+                while (it.hasNext()) {
+                  Long take = it.next();
+                  pointers.add(FlumeEventPointer.fromLong(take));
+                }
+              }
+            }
             if (pointers != null && pointers.size() > 0) {
               processCommit(((Commit) record).getType(), pointers);
               count += pointers.size();
@@ -256,6 +321,19 @@ class ReplayHandler {
         }
       }
     }
+    //re-insert the events in the take map,
+    //since the takes were not committed.
+    int uncommittedTakes = 0;
+    for (Long inflightTxnId : inflightTakes.keySet()) {
+      Set<Long> inflightUncommittedTakes =
+              inflightTakes.get(inflightTxnId);
+      for (Long inflightUncommittedTake : inflightUncommittedTakes) {
+        queue.addHead(FlumeEventPointer.fromLong(inflightUncommittedTake));
+        uncommittedTakes++;
+      }
+    }
+    inflightTakes.clear();
+    count += uncommittedTakes;
     int pendingTakesSize = pendingTakes.size();
     if (pendingTakesSize > 0) {
       String msg = "Pending takes " + pendingTakesSize

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index 7ec5916..2893538 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -30,9 +30,13 @@ import org.junit.Test;
 public class TestCheckpoint {
 
   File file;
+  File inflightPuts;
+  File inflightTakes;
   @Before
   public void setup() throws IOException {
     file = File.createTempFile("Checkpoint", "");
+    inflightPuts = File.createTempFile("inflightPuts", "");
+    inflightTakes = File.createTempFile("inflightTakes", "");
     Assert.assertTrue(file.isFile());
     Assert.assertTrue(file.canWrite());
   }
@@ -41,15 +45,18 @@ public class TestCheckpoint {
     file.delete();
   }
   @Test
-  public void testSerialization() throws IOException {
+  public void testSerialization() throws Exception {
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
-    FlumeEventQueue queueIn = new FlumeEventQueue(1, file, "test");
+    FlumeEventQueue queueIn = new FlumeEventQueue(1, file, inflightTakes,
+            inflightPuts, "test");
     queueIn.addHead(ptrIn);
-    FlumeEventQueue queueOut = new FlumeEventQueue(1, file, "test");
+    FlumeEventQueue queueOut = new FlumeEventQueue(1, file, inflightTakes,
+            inflightPuts, "test");
     Assert.assertEquals(0, queueOut.getLogWriteOrderID());
     queueIn.checkpoint(false);
-    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, "test");
-    FlumeEventPointer ptrOut = queueOut2.removeHead();
+    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, inflightTakes,
+            inflightPuts, "test");
+    FlumeEventPointer ptrOut = queueOut2.removeHead(0);
     Assert.assertEquals(ptrIn, ptrOut);
     Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 1d5a0f9..720fa27 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.flume.channel.file;
 
 import java.io.File;
@@ -56,17 +55,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 public class TestFileChannel {
 
   private static final Logger LOG = LoggerFactory
-      .getLogger(TestFileChannel.class);
-
+          .getLogger(TestFileChannel.class);
   private FileChannel channel;
   private File baseDir;
   private File checkpointDir;
@@ -101,7 +104,7 @@ public class TestFileChannel {
   private Context createContext(Map<String, String> overrides) {
     Context context = new Context();
     context.put(FileChannelConfiguration.CHECKPOINT_DIR,
-        checkpointDir.getAbsolutePath());
+            checkpointDir.getAbsolutePath());
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
     context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
     // Set checkpoint for 5 seconds otherwise test will run out of memory
@@ -120,6 +123,176 @@ public class TestFileChannel {
     return channel;
   }
   @Test
+  public void testFailAfterTakeBeforeCommit() throws Throwable {
+    final FileChannel channel = createFileChannel();
+    channel.start();
+    final Set<String> eventSet = Sets.newHashSet();
+    eventSet.addAll(putEvents(channel, "testTakeFailBeforeCommit", 5, 5));
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.take();
+    channel.take();
+    //Simulate multiple sources, so separate thread - txns are thread local,
+    //so a new txn wont be created here unless it is in a different thread.
+    Executors.newSingleThreadExecutor().submit(new Runnable() {
+      @Override
+      public void run() {
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        channel.take();
+        channel.take();
+        channel.take();
+      }
+    }).get();
+    long lastTake = System.currentTimeMillis();
+    File inflightsFile = new File(checkpointDir, "inflighttakes");
+
+    while (inflightsFile.lastModified() < lastTake) {
+      Thread.sleep(500);
+    }
+    channel.stop();
+    //Simulate a sink, so separate thread.
+    try {
+      Executors.newSingleThreadExecutor().submit(new Runnable() {
+        @Override
+        public void run() {
+          FileChannel channel = createFileChannel();
+          channel.start();
+          Transaction tx = channel.getTransaction();
+          tx.begin();
+          Event e;
+          /*
+           * Explicitly not put in a loop, so it is easy to find out which
+           * event became null easily.
+           */
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(eventSet.remove(new String(e.getBody())));
+          tx.commit();
+          tx.close();
+          channel.stop();
+        }
+      }).get();
+    } catch (ExecutionException e) {
+      throw e.getCause();
+    }
+
+  }
+
+  @Test
+  public void testFailAfterPutCheckpointCommit() throws Throwable {
+    final Set<String> set = Sets.newHashSet();
+    final Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
+    final FileChannel channel = createFileChannel(overrides);
+    channel.start();
+    Transaction tx = channel.getTransaction();
+    //Initially commit a put to make sure checkpoint is required.
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'1', '2'}));
+    set.add(new String(new byte[]{'1', '2'}));
+    tx.commit();
+    tx.close();
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'a', 'b'}));
+    set.add(new String(new byte[]{'a', 'b'}));
+    channel.put(EventBuilder.withBody(new byte[]{'c', 'd'}));
+    set.add(new String(new byte[]{'c', 'd'}));
+    channel.put(EventBuilder.withBody(new byte[]{'e', 'f'}));
+    set.add(new String(new byte[]{'e', 'f'}));
+    //Simulate multiple sources, so separate thread - txns are thread local,
+    //so a new txn wont be created here unless it is in a different thread.
+    final CountDownLatch latch = new CountDownLatch(1);
+    Executors.newSingleThreadExecutor().submit(
+            new Runnable() {
+              @Override
+              public void run() {
+                Transaction tx = channel.getTransaction();
+                tx.begin();
+                channel.put(EventBuilder.withBody(new byte[]{'3', '4'}));
+                channel.put(EventBuilder.withBody(new byte[]{'5', '6'}));
+                channel.put(EventBuilder.withBody(new byte[]{'7', '8'}));
+                set.add(new String(new byte[]{'3', '4'}));
+                set.add(new String(new byte[]{'5', '6'}));
+                set.add(new String(new byte[]{'7', '8'}));
+
+                try {
+                  latch.await();
+                  tx.commit();
+                } catch (InterruptedException e) {
+                  tx.rollback();
+                  Throwables.propagate(e);
+                } finally {
+                  tx.close();
+                }
+              }
+            });
+    long lastPut = System.currentTimeMillis();
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    while (checkpoint.lastModified() < lastPut) {
+      Thread.sleep(500);
+    }
+    tx.commit();
+    tx.close();
+    latch.countDown();
+    Thread.sleep(2000);
+    channel.stop();
+
+    //Simulate a sink, so separate thread.
+    try {
+      Executors.newSingleThreadExecutor().submit(new Runnable() {
+        @Override
+        public void run() {
+          FileChannel channel = createFileChannel();
+          channel.start();
+          Transaction tx = channel.getTransaction();
+          tx.begin();
+          Event e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          e = channel.take();
+          Assert.assertNotNull(e);
+          Assert.assertTrue(set.remove(new String(e.getBody())));
+          tx.commit();
+          tx.close();
+          channel.stop();
+        }
+      }).get();
+    } catch (ExecutionException e) {
+      throw e.getCause();
+    }
+
+  }
+
+  @Test
   public void testRestartLogReplayV1() throws Exception {
     doTestRestart(true);
   }
@@ -130,7 +303,7 @@ public class TestFileChannel {
   public void doTestRestart(boolean useLogReplayV1) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
-        String.valueOf(useLogReplayV1));
+            String.valueOf(useLogReplayV1));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -268,7 +441,7 @@ public class TestFileChannel {
       Assert.fail();
     } catch (ChannelException e) {
       Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]",
-          e.getMessage());
+              e.getMessage());
     }
     // take an event, roll it back, and
     // then make sure a put fails
@@ -285,7 +458,7 @@ public class TestFileChannel {
       Assert.fail();
     } catch (ChannelException e) {
       Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]",
-          e.getMessage());
+              e.getMessage());
     }
     // ensure we the events back
     Assert.assertEquals(5, takeEvents(channel, 1, 5).size());
@@ -316,11 +489,11 @@ public class TestFileChannel {
       Assert.fail();
     } catch (ChannelException e) {
       Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]",
-          e.getMessage());
+              e.getMessage());
     }
     // then do a put which will block but it will be assigned a tx id
     Future<String> put = Executors.newSingleThreadExecutor()
-        .submit(new Callable<String>() {
+            .submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
         List<String> result = putEvents(channel, "blocked-put", 1, 1);
@@ -420,11 +593,11 @@ public class TestFileChannel {
     final CountDownLatch producerStopLatch = new CountDownLatch(numThreads);
     final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
     final List<Exception> errors = Collections
-        .synchronizedList(new ArrayList<Exception>());
+            .synchronizedList(new ArrayList<Exception>());
     final List<String> expected = Collections
-        .synchronizedList(new ArrayList<String>());
+            .synchronizedList(new ArrayList<String>());
     final List<String> actual = Collections
-        .synchronizedList(new ArrayList<String>());
+            .synchronizedList(new ArrayList<String>());
     for (int i = 0; i < numThreads; i++) {
       final int id = i;
       Thread t = new Thread() {
@@ -479,9 +652,9 @@ public class TestFileChannel {
       t.start();
     }
     Assert.assertTrue("Timed out waiting for producers",
-        producerStopLatch.await(30, TimeUnit.SECONDS));
+            producerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertTrue("Timed out waiting for consumer",
-        consumerStopLatch.await(30, TimeUnit.SECONDS));
+            consumerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertEquals(Collections.EMPTY_LIST, errors);
     Collections.sort(expected);
     Collections.sort(actual);
@@ -501,9 +674,9 @@ public class TestFileChannel {
     // checkpoints and rolls occur during the test
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
-        String.valueOf(10L * 1000L));
+            String.valueOf(10L * 1000L));
     overrides.put(FileChannelConfiguration.MAX_FILE_SIZE,
-        String.valueOf(1024 * 1024 * 5));
+            String.valueOf(1024 * 1024 * 5));
     // do reconfiguration
     channel = createFileChannel(overrides);
     channel.start();
@@ -558,13 +731,13 @@ public class TestFileChannel {
    */
   @Test
   public void testFileFormatV2postFLUME1432()
-      throws Exception {
+          throws Exception {
     copyDecompressed("fileformat-v2-checkpoint.gz",
-        new File(checkpointDir, "checkpoint"));
+            new File(checkpointDir, "checkpoint"));
     for (int i = 0; i < dataDirs.length; i++) {
       int fileIndex = i + 1;
       copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
-          new File(dataDirs[i], "log-" + fileIndex));
+              new File(dataDirs[i], "log-" + fileIndex));
     }
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
@@ -573,7 +746,7 @@ public class TestFileChannel {
     Assert.assertTrue(channel.isOpen());
     List<String> events = takeEvents(channel, 1);
     List<String> expected = Arrays.asList(new String[] {
-        "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691"
+              "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691"
         }
     );
     Assert.assertEquals(expected, events);
@@ -584,27 +757,27 @@ public class TestFileChannel {
    */
   @Test
   public void testFileFormatV2PreFLUME1432LogReplayV1()
-      throws Exception {
+          throws Exception {
     doTestFileFormatV2PreFLUME1432(true);
   }
   @Test
   public void testFileFormatV2PreFLUME1432LogReplayV2()
-      throws Exception {
+          throws Exception {
     doTestFileFormatV2PreFLUME1432(false);
   }
   public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1)
-      throws Exception {
+          throws Exception {
     copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz",
-        new File(checkpointDir, "checkpoint"));
+            new File(checkpointDir, "checkpoint"));
     for (int i = 0; i < dataDirs.length; i++) {
       int fileIndex = i + 1;
-      copyDecompressed("fileformat-v2-pre-FLUME-1432-log-"+fileIndex+".gz",
-          new File(dataDirs[i], "log-" + fileIndex));
+      copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex + ".gz",
+              new File(dataDirs[i], "log-" + fileIndex));
     }
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
     overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
-        String.valueOf(useLogReplayV1));
+            String.valueOf(useLogReplayV1));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -612,20 +785,184 @@ public class TestFileChannel {
     Assert.assertEquals(50, events.size());
   }
 
+  /**
+   * Test contributed by Brock Noland during code review.
+   * @throws Exception
+   */
+  @Test
+  public void testTakeTransactionCrossingCheckpoint() throws Exception {
+    channel = createFileChannel();
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    List<String> in = Lists.newArrayList();
+    try {
+      while (true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
+      }
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity. [channel="
+              + channel.getName() + "]", e.getMessage());
+    }
+    List<String> out = Lists.newArrayList();
+    // now take one item off the channel
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    long takeTime = System.currentTimeMillis();
+    Assert.assertNotNull(e);
+    String s = new String(e.getBody(), Charsets.UTF_8);
+    out.add(s);
+    LOG.info("Slow take got " + s);
+    // sleep so a checkpoint occurs. take is before
+    // and commit is after the checkpoint
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    while(checkpoint.lastModified() < takeTime){
+      TimeUnit.MILLISECONDS.sleep(500);
+    }
+    tx.commit();
+    tx.close();
+    channel.stop();
+    channel = createFileChannel();
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    // we should not geet the item we took of the queue above
+    out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
+    channel.stop();
+    Collections.sort(in);
+    Collections.sort(out);
+    if (!out.equals(in)) {
+      List<String> difference = new ArrayList<String>();
+      if (in.size() > out.size()) {
+        LOG.info("The channel shorted us");
+        difference.addAll(in);
+        difference.removeAll(out);
+      } else {
+        LOG.info("We got more events than expected, perhaps dups");
+        difference.addAll(out);
+        difference.removeAll(in);
+      }
+      LOG.error("difference = " + difference
+              + ", in.size = " + in.size() + ", out.size = " + out.size());
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void testPutForceCheckpointCommitReplay() throws Exception{
+    Set<String> set = Sets.newHashSet();
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
+    FileChannel channel = createFileChannel(overrides);
+    channel.start();
+    //Force a checkpoint by committing a transaction
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'a','b'}));
+    set.add(new String(new byte[]{'a','b'}));
+    tx.commit();
+    tx.close();
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'c','d'}));
+    set.add(new String(new byte[]{'c', 'd'}));
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    long t1 = System.currentTimeMillis();
+    while(checkpoint.lastModified() < t1) {
+      TimeUnit.MILLISECONDS.sleep(500);
+      if (System.currentTimeMillis() - checkpoint.lastModified() > 15000) {
+        throw new TimeoutException("Checkpoint did not happen");
+      }
+    }
+    tx.commit();
+    tx.close();
+    channel.stop();
+
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertTrue(set.contains(new String(e.getBody())));
+    e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertTrue(set.contains(new String(e.getBody())));
+    tx.commit();
+    tx.close();
+    channel.stop();
+
+  }
+
+  @Test
+  public void testPutCheckpointCommitCheckpointReplay() throws Exception {
+    Set<String> set = Sets.newHashSet();
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
+    FileChannel channel = createFileChannel(overrides);
+    channel.start();
+    //Force a checkpoint by committing a transaction
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'a','b'}));
+    set.add(new String(new byte[]{'a','b'}));
+    tx.commit();
+    tx.close();
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(new byte[]{'c', 'd'}));
+    set.add(new String(new byte[]{'c','d'}));
+    File checkpoint = new File(checkpointDir, "checkpoint");
+    long t1 = System.currentTimeMillis();
+    while (checkpoint.lastModified() < t1) {
+      TimeUnit.MILLISECONDS.sleep(500);
+      if(t1 - checkpoint.lastModified() > 15000){
+        throw new TimeoutException("Checkpoint was expected,"
+                + " but did not happen");
+      }
+    }
+    tx.commit();
+    tx.close();
+    long t2 = System.currentTimeMillis();
+    while(checkpoint.lastModified() < t2){
+      TimeUnit.MILLISECONDS.sleep(500);
+      if (t2 - checkpoint.lastModified() > 15000) {
+        throw new TimeoutException("Checkpoint was expected, "
+                + "but did not happen");
+      }
+    }
+    channel.stop();
+
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertTrue(set.contains(new String(e.getBody())));
+    e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertTrue(set.contains(new String(e.getBody())));
+    tx.commit();
+    tx.close();
+    channel.stop();
+  }
+
   private static void copyDecompressed(String resource, File output)
-      throws IOException {
+          throws IOException {
     URL input =  Resources.getResource(resource);
     long copied = ByteStreams.copy(new GZIPInputStream(input.openStream()),
-        new FileOutputStream(output));
+            new FileOutputStream(output));
     LOG.info("Copied " + copied + " bytes from " + input + " to " + output);
   }
 
   private static List<String> takeEvents(Channel channel,
-      int batchSize) throws Exception {
+          int batchSize) throws Exception {
     return takeEvents(channel, batchSize, Integer.MAX_VALUE);
   }
   private static List<String> takeEvents(Channel channel,
-      int batchSize, int numEvents) throws Exception {
+          int batchSize, int numEvents) throws Exception {
     List<String> result = Lists.newArrayList();
     for (int i = 0; i < numEvents; i += batchSize) {
       for (int j = 0; j < batchSize; j++) {
@@ -650,7 +987,7 @@ public class TestFileChannel {
     return result;
   }
   private static List<String> putEvents(Channel channel, String prefix,
-      int batchSize, int numEvents) throws Exception {
+          int batchSize, int numEvents) throws Exception {
     List<String> result = Lists.newArrayList();
     for (int i = 0; i < numEvents; i += batchSize) {
       for (int j = 0; j < batchSize; j++) {

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index 569b7c7..d09ddde 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -18,8 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.collect.SetMultimap;
 import java.io.File;
-import java.io.IOException;
 import java.util.Set;
 
 import org.junit.Assert;
@@ -27,55 +27,61 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 public class TestFlumeEventQueue {
 
   File file;
+  File inflightTakes;
+  File inflightPuts;
   FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
   FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
   FlumeEventQueue queue;
   @Before
   public void setup() throws Exception {
     file = File.createTempFile("Checkpoint", "");
+    inflightTakes = File.createTempFile("inflighttakes", "");
+    inflightPuts = File.createTempFile("inflightputs", "");
   }
   @Test
-  public void testQueueIsEmptyAfterCreation() throws IOException {
-    queue = new FlumeEventQueue(1000, file, "test");
-    Assert.assertNull(queue.removeHead());
+  public void testQueueIsEmptyAfterCreation() throws Exception {
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    Assert.assertNull(queue.removeHead(0));
   }
   @Test
   public void testCapacity() throws Exception {
-    queue = new FlumeEventQueue(1, file, "test");
+    queue = new FlumeEventQueue(1, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityZero() throws Exception {
-    queue = new FlumeEventQueue(0, file, "test");
+    queue = new FlumeEventQueue(0, file, inflightTakes, inflightPuts,"test");
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityNegative() throws Exception {
-    queue = new FlumeEventQueue(-1, file, "test");
+    queue = new FlumeEventQueue(-1, file, inflightTakes, inflightPuts,"test");
   }
   @Test
   public void addTail1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addTail(pointer1));
-    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
   public void addTail2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
-    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
   }
   @Test
   public void addTailLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -84,7 +90,7 @@ public class TestFlumeEventQueue {
       Assert.assertEquals(fileIDs, queue.getFileIDs());
     }
     for (int i = 1; i <= size; i++) {
-      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead(0));
       fileIDs.remove(i);
       Assert.assertEquals(fileIDs, queue.getFileIDs());
     }
@@ -92,24 +98,24 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHead1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
-    Assert.assertEquals(pointer1, queue.removeHead());
+    Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
   public void addHead2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
-    Assert.assertEquals(pointer2, queue.removeHead());
+    Assert.assertEquals(pointer2, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
   }
   @Test
   public void addHeadLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -118,7 +124,7 @@ public class TestFlumeEventQueue {
       Assert.assertEquals(fileIDs, queue.getFileIDs());
     }
     for (int i = size; i > 0; i--) {
-      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead());
+      Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead(0));
       fileIDs.remove(i);
       Assert.assertEquals(fileIDs, queue.getFileIDs());
     }
@@ -126,42 +132,42 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addTailRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
-    Assert.assertNull(queue.removeHead());
+    Assert.assertNull(queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
 
   @Test
   public void addTailRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
-    Assert.assertEquals(pointer2, queue.removeHead());
+    Assert.assertEquals(pointer2, queue.removeHead(0));
   }
 
   @Test
   public void addHeadRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
-    Assert.assertNull(queue.removeHead());
+    Assert.assertNull(queue.removeHead(0));
   }
   @Test
   public void addHeadRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
-    Assert.assertEquals(pointer2, queue.removeHead());
+    Assert.assertEquals(pointer2, queue.removeHead(0));
   }
   @Test
   public void testWrappingCorrectly() throws Exception {
-    queue = new FlumeEventQueue(1000, file, "test");
+    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
       if(!queue.addHead(new FlumeEventPointer(i, i))) {
@@ -169,7 +175,7 @@ public class TestFlumeEventQueue {
       }
     }
     for (int i = queue.getSize()/2; i > 0; i--) {
-      Assert.assertNotNull(queue.removeHead());
+      Assert.assertNotNull(queue.removeHead(0));
     }
     // addHead below would throw an IndexOOBounds with
     // bad version of FlumeEventQueue.convert
@@ -179,4 +185,48 @@ public class TestFlumeEventQueue {
       }
     }
   }
+  @Test
+  public void testInflightPuts() throws Exception{
+    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
+    long txnID2 = txnID1 + 1;
+    queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
+    queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1);
+    queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2);
+    queue.checkpoint(true);
+    TimeUnit.SECONDS.sleep(3L);
+    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
+    Assert.assertTrue(deserializedMap.get(
+            txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(
+            txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(
+            txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+  }
+
+  @Test
+  public void testInflightTakes() throws Exception {
+    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
+    long txnID2 = txnID1 + 1;
+    queue.addTail(new FlumeEventPointer(1, 1));
+    queue.addTail(new FlumeEventPointer(2, 1));
+    queue.addTail(new FlumeEventPointer(2, 2));
+    queue.removeHead(txnID1);
+    queue.removeHead(txnID2);
+    queue.removeHead(txnID2);
+    queue.checkpoint(true);
+    TimeUnit.SECONDS.sleep(3L);
+    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+        SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
+    Assert.assertTrue(deserializedMap.get(
+            txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(
+            txnID2).contains(new FlumeEventPointer(2, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(
+            txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/flume/blob/cf3c16f4/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index e0b5e3f..e923a30 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -140,7 +140,7 @@ public class TestLog {
                 dataDirs).setChannelName("testlog").build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    Assert.assertNull(queue.removeHead());
+    Assert.assertNull(queue.removeHead(transactionID));
   }
 
   /**
@@ -164,7 +164,7 @@ public class TestLog {
             .setChannelName("testlog").build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    Assert.assertNull(queue.removeHead());
+    Assert.assertNull(queue.removeHead(0));
   }
 
   /**
@@ -212,7 +212,7 @@ public class TestLog {
             .setChannelName("testlog").build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    FlumeEventPointer eventPointerOut = queue.removeHead();
+    FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNull(eventPointerOut);
   }
 
@@ -228,7 +228,7 @@ public class TestLog {
             .setChannelName("testlog").build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    FlumeEventPointer eventPointerOut = queue.removeHead();
+    FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNull(eventPointerOut);
   }
 
@@ -244,16 +244,16 @@ public class TestLog {
             .setChannelName("testlog").build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    FlumeEventPointer eventPointerOut = queue.removeHead();
+    FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNull(eventPointerOut);
   }
 
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn) throws IOException, InterruptedException {
     FlumeEventQueue queue = log.getFlumeEventQueue();
-    FlumeEventPointer eventPointerOut = queue.removeHead();
+    FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNotNull(eventPointerOut);
-    Assert.assertNull(queue.removeHead());
+    Assert.assertNull(queue.removeHead(0));
     Assert.assertEquals(eventPointerIn, eventPointerOut);
     Assert.assertEquals(eventPointerIn.hashCode(), eventPointerOut.hashCode());
     FlumeEvent eventOut = log.get(eventPointerOut);


Mime
View raw message