flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-2181 - Optionally disable File Channel fsyncs (Hari via Brock)
Date Fri, 02 May 2014 14:33:44 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk a94594dd2 -> 6115e7d6d


FLUME-2181 - Optionally disable File Channel fsyncs (Hari via Brock)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6115e7d6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6115e7d6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6115e7d6

Branch: refs/heads/trunk
Commit: 6115e7d6d611d2b82dc2583b95a13d4c0886a93f
Parents: a94594d
Author: Brock Noland <brock@apache.org>
Authored: Fri May 2 07:32:33 2014 -0700
Committer: Brock Noland <brock@apache.org>
Committed: Fri May 2 07:32:33 2014 -0700

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java | 11 ++-
 .../apache/flume/channel/file/FileChannel.java  | 26 ++++++-
 .../channel/file/FileChannelConfiguration.java  |  7 ++
 .../java/org/apache/flume/channel/file/Log.java | 54 ++++++++++---
 .../org/apache/flume/channel/file/LogFile.java  | 81 +++++++++++++++++---
 .../flume/channel/file/LogFileFactory.java      | 16 ++--
 .../apache/flume/channel/file/LogFileV2.java    |  5 +-
 .../apache/flume/channel/file/LogFileV3.java    | 61 +++++++++++----
 .../flume/channel/file/ReplayHandler.java       | 12 ++-
 .../channel/file/TransactionEventRecord.java    |  4 +
 .../encryption/AESCTRNoPaddingProvider.java     |  5 +-
 .../encryption/DecryptionFailureException.java  | 38 +++++++++
 .../channel/file/TestCheckpointRebuilder.java   |  2 +-
 .../flume/channel/file/TestFileChannel.java     | 35 +++++++--
 .../org/apache/flume/channel/file/TestLog.java  | 20 ++---
 .../apache/flume/channel/file/TestLogFile.java  | 18 ++---
 .../apache/flume/channel/file/TestUtils.java    |  4 +-
 .../flume/tools/FileChannelIntegrityTool.java   |  2 +-
 .../tools/TestFileChannelIntegrityTool.java     |  2 +-
 19 files changed, 314 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 4388181..b961ae2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -49,14 +49,17 @@ public class CheckpointRebuilder {
           HashMultimap.create();
   private final SetMultimap<Long, ComparableFlumeEventPointer>
           uncommittedTakes = HashMultimap.create();
+  private final boolean fsyncPerTransaction;
 
   private static Logger LOG =
           LoggerFactory.getLogger(CheckpointRebuilder.class);
 
   public CheckpointRebuilder(List<File> logFiles,
-          FlumeEventQueue queue) throws IOException {
+    FlumeEventQueue queue, boolean fsyncPerTransaction) throws
+    IOException {
     this.logFiles = logFiles;
     this.queue = queue;
+    this.fsyncPerTransaction = fsyncPerTransaction;
   }
 
   public boolean rebuild() throws IOException, Exception {
@@ -64,7 +67,8 @@ public class CheckpointRebuilder {
     List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
     for (File logFile : logFiles) {
       try {
-        logReaders.add(LogFileFactory.getSequentialReader(logFile, null));
+        logReaders.add(LogFileFactory.getSequentialReader(logFile, null,
+          fsyncPerTransaction));
       } catch(EOFException e) {
         LOG.warn("Ignoring " + logFile + " due to EOF", e);
       }
@@ -252,7 +256,8 @@ public class CheckpointRebuilder {
               new File(checkpointDir, "inflighttakes"),
               new File(checkpointDir, "inflightputs"),
               new File(checkpointDir, Log.QUEUE_SET));
-      CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue);
+      CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles,
+        queue, true);
       if(rebuilder.rebuild()) {
         rebuilder.writeCheckpoint();
       } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 5203ca1..0f242d2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -95,6 +95,8 @@ public class FileChannel extends BasicChannelSemantics {
   private String encryptionActiveKey;
   private String encryptionCipherProvider;
   private boolean useDualCheckpoints;
+  private boolean fsyncPerTransaction;
+  private int fsyncInterval;
 
   @Override
   public synchronized void setName(String name) {
@@ -233,6 +235,12 @@ public class FileChannel extends BasicChannelSemantics {
           "key provider name is not.");
     }
 
+    fsyncPerTransaction = context.getBoolean(FileChannelConfiguration
+      .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN);
+
+    fsyncInterval = context.getInteger(FileChannelConfiguration
+      .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);
+
     if(queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
     }
@@ -265,6 +273,8 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setEncryptionCipherProvider(encryptionCipherProvider);
       builder.setUseDualCheckpoints(useDualCheckpoints);
       builder.setBackupCheckpointDir(backupCheckpointDir);
+      builder.setFsyncPerTransaction(fsyncPerTransaction);
+      builder.setFsyncInterval(fsyncInterval);
       log = builder.build();
       log.replay();
       open = true;
@@ -328,8 +338,8 @@ public class FileChannel extends BasicChannelSemantics {
               trans.getStateAsString()  + channelNameDescriptor);
     }
     trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
-        transactionCapacity, keepAlive, queueRemaining, getName(),
-        channelCounter);
+      transactionCapacity, keepAlive, queueRemaining, getName(),
+      fsyncPerTransaction, channelCounter);
     transactions.set(trans);
     return trans;
   }
@@ -401,9 +411,11 @@ public class FileChannel extends BasicChannelSemantics {
     private final Semaphore queueRemaining;
     private final String channelNameDescriptor;
     private final ChannelCounter channelCounter;
+    private final boolean fsyncPerTransaction;
     public FileBackedTransaction(Log log, long transactionID,
         int transCapacity, int keepAlive, Semaphore queueRemaining,
-        String name, ChannelCounter counter) {
+        String name, boolean fsyncPerTransaction, ChannelCounter
+      counter) {
       this.log = log;
       queue = log.getFlumeEventQueue();
       this.transactionID = transactionID;
@@ -411,6 +423,7 @@ public class FileChannel extends BasicChannelSemantics {
       this.queueRemaining = queueRemaining;
       putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
       takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
+      this.fsyncPerTransaction = fsyncPerTransaction;
       channelNameDescriptor = "[channel=" + name + "]";
       this.channelCounter = counter;
     }
@@ -500,6 +513,13 @@ public class FileChannel extends BasicChannelSemantics {
               LOG.warn("Corrupt record replaced by File Channel Integrity " +
                 "tool found. Will retrieve next event", e);
               takeList.remove(ptr);
+            } catch (CorruptEventException ex) {
+              if (fsyncPerTransaction) {
+                throw new ChannelException(ex);
+              }
+              LOG.warn("Corrupt record found. Event will be " +
+                "skipped, and next event will be read.", ex);
+              takeList.remove(ptr);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index e4bc879..87dc653 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -87,4 +87,11 @@ public class FileChannelConfiguration {
   public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
   public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
 
+  public static final String FSYNC_PER_TXN = "fsyncPerTransaction";
+  public static final boolean DEFAULT_FSYNC_PRE_TXN = true;
+
+  public static final String FSYNC_INTERVAL = "fsyncInterval";
+  public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.
+
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 579ee35..5bac0f4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -125,6 +125,9 @@ public class Log {
   private final boolean useDualCheckpoints;
   private volatile boolean backupRestored = false;
 
+  private final boolean fsyncPerTransaction;
+  private final int fsyncInterval;
+
   private int readCount;
   private int putCount;
   private int takeCount;
@@ -150,6 +153,25 @@ public class Log {
     private boolean bUseDualCheckpoints = false;
     private File bBackupCheckpointDir = null;
 
+    private boolean fsyncPerTransaction = true;
+    private int fsyncInterval;
+
+    boolean isFsyncPerTransaction() {
+      return fsyncPerTransaction;
+    }
+
+    void setFsyncPerTransaction(boolean fsyncPerTransaction) {
+      this.fsyncPerTransaction = fsyncPerTransaction;
+    }
+
+    int getFsyncInterval() {
+      return fsyncInterval;
+    }
+
+    void setFsyncInterval(int fsyncInterval) {
+      this.fsyncInterval = fsyncInterval;
+    }
+
     Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
       bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
       return this;
@@ -231,7 +253,7 @@ public class Log {
         useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
         bEncryptionKeyProvider, bEncryptionKeyAlias,
         bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
-        bLogDirs);
+        fsyncPerTransaction, fsyncInterval, bLogDirs);
     }
   }
 
@@ -241,7 +263,8 @@ public class Log {
     long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
     @Nullable String encryptionKeyAlias,
     @Nullable String encryptionCipherProvider,
-    long usableSpaceRefreshInterval, File... logDirs)
+    long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+    int fsyncInterval, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
       "checkpointInterval <= 0");
@@ -318,6 +341,8 @@ public class Log {
     this.checkpointDir = checkpointDir;
     this.backupCheckpointDir = backupCheckpointDir;
     this.logDirs = logDirs;
+    this.fsyncPerTransaction = fsyncPerTransaction;
+    this.fsyncInterval = fsyncInterval;
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
       ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
@@ -354,7 +379,7 @@ public class Log {
           dataFiles.add(file);
           nextFileID.set(Math.max(nextFileID.get(), id));
           idLogFileMap.put(id, LogFileFactory.getRandomReader(new File(logDir,
-              PREFIX + id), encryptionKeyProvider));
+              PREFIX + id), encryptionKeyProvider, fsyncPerTransaction));
         }
       }
       LOGGER.info("Found NextFileID " + nextFileID +
@@ -468,13 +493,13 @@ public class Log {
                         KeyProvider encryptionKeyProvider,
                         boolean useFastReplay) throws Exception {
     CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
-            queue);
+            queue, fsyncPerTransaction);
     if (useFastReplay && rebuilder.rebuild()) {
       didFastReplay = true;
       LOGGER.info("Fast replay successful.");
     } else {
       ReplayHandler replayHandler = new ReplayHandler(queue,
-              encryptionKeyProvider);
+              encryptionKeyProvider, fsyncPerTransaction);
       if (useLogReplayV1) {
         LOGGER.info("Replaying logs with v1 replay logic");
         replayHandler.replayLogv1(dataFiles);
@@ -551,7 +576,7 @@ public class Log {
    * @throws InterruptedException
    */
   FlumeEvent get(FlumeEventPointer pointer) throws IOException,
-    InterruptedException, NoopRecordException {
+    InterruptedException, NoopRecordException, CorruptEventException {
     Preconditions.checkState(open, "Log is closed");
     int id = pointer.getFileID();
     LogFile.RandomReader logFile = idLogFileMap.get(id);
@@ -559,9 +584,12 @@ public class Log {
     try {
       return logFile.get(pointer.getOffset());
     } catch (CorruptEventException ex) {
-      open = false;
-      throw new IOException("Corrupt event found. Please run File Channel " +
-        "Integrity tool.", ex);
+      if (fsyncPerTransaction) {
+        open = false;
+        throw new IOException("Corrupt event found. Please run File Channel " +
+          "Integrity tool.", ex);
+      }
+      throw ex;
     }
   }
 
@@ -906,9 +934,10 @@ public class Log {
           File file = new File(logDirs[index], PREFIX + fileID);
           LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
             maxFileSize, encryptionKey, encryptionKeyAlias,
-            encryptionCipherProvider, usableSpaceRefreshInterval);
+            encryptionCipherProvider, usableSpaceRefreshInterval,
+            fsyncPerTransaction, fsyncInterval);
           idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
-            encryptionKeyProvider));
+            encryptionKeyProvider, fsyncPerTransaction));
           // writer from this point on will get new reference
           logFiles.set(index, writer);
           // close out old log
@@ -991,7 +1020,8 @@ public class Log {
           } finally {
             writer.close();
           }
-          reader = LogFileFactory.getRandomReader(file, encryptionKeyProvider);
+          reader = LogFileFactory.getRandomReader(file,
+            encryptionKeyProvider, fsyncPerTransaction);
           idLogFileMap.put(id, reader);
           LOGGER.debug("Updated checkpoint for file: " + file
               + "logWriteOrderID " + logWriteOrderID);

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 26a24b1..488dcf4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -41,6 +41,9 @@ import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 @InterfaceAudience.Private
@@ -169,13 +172,18 @@ public abstract class LogFile {
     private long lastCommitPosition;
     private long lastSyncPosition;
 
+    private final boolean fsyncPerTransaction;
+    private final int fsyncInterval;
+    private final ScheduledExecutorService syncExecutor;
+    private volatile boolean dirty = false;
+
     // To ensure we can count the number of fsyncs.
     private long syncCount;
 
 
     Writer(File file, int logFileID, long maxFileSize,
-        CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval)
-        throws IOException {
+        CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval,
+        boolean fsyncPerTransaction, int fsyncInterval) throws IOException {
       this.file = file;
       this.logFileID = logFileID;
       this.maxFileSize = Math.min(maxFileSize,
@@ -183,6 +191,25 @@ public abstract class LogFile {
       this.encryptor = encryptor;
       writeFileHandle = new RandomAccessFile(file, "rw");
       writeFileChannel = writeFileHandle.getChannel();
+      this.fsyncPerTransaction = fsyncPerTransaction;
+      this.fsyncInterval = fsyncInterval;
+      if(!fsyncPerTransaction) {
+        LOG.info("Sync interval = " + fsyncInterval);
+        syncExecutor = Executors.newSingleThreadScheduledExecutor();
+        syncExecutor.scheduleWithFixedDelay(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              sync();
+            } catch (Throwable ex) {
+              LOG.error("Data file, " + getFile().toString() + " could not " +
+                "be synced to disk due to an error.", ex);
+            }
+          }
+        }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
+      } else {
+        syncExecutor = null;
+      }
       usableSpace = new CachedFSUsableSpace(file, usableSpaceRefreshInterval);
       LOG.info("Opened " + file);
       open = true;
@@ -258,6 +285,7 @@ public abstract class LogFile {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
       }
       write(buffer);
+      dirty = true;
       lastCommitPosition = position();
     }
 
@@ -299,6 +327,14 @@ public abstract class LogFile {
      * @throws LogFileRetryableIOException - if this log file is closed.
      */
     synchronized void sync() throws IOException {
+      if (!fsyncPerTransaction && !dirty) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(
+            "No events written to file, " + getFile().toString() +
+              " in last " + fsyncInterval + " or since last commit.");
+        }
+        return;
+      }
       if (!isOpen()) {
         throw new LogFileRetryableIOException("File closed " + file);
       }
@@ -306,6 +342,7 @@ public abstract class LogFile {
         getFileChannel().force(false);
         lastSyncPosition = position();
         syncCount++;
+        dirty = false;
       }
     }
 
@@ -322,6 +359,13 @@ public abstract class LogFile {
     synchronized void close() {
       if(open) {
         open = false;
+        if (!fsyncPerTransaction) {
+          // Shutdown the executor before attempting to close.
+          if(syncExecutor != null) {
+            // No need to wait for it to shutdown.
+            syncExecutor.shutdown();
+          }
+        }
         if(writeFileChannel.isOpen()) {
           LOG.info("Closing " + file);
           try {
@@ -396,12 +440,15 @@ public abstract class LogFile {
     private final BlockingQueue<RandomAccessFile> readFileHandles =
         new ArrayBlockingQueue<RandomAccessFile>(50, true);
     private final KeyProvider encryptionKeyProvider;
+    private final boolean fsyncPerTransaction;
     private volatile boolean open;
-    public RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider)
+    public RandomReader(File file, @Nullable KeyProvider
+      encryptionKeyProvider, boolean fsyncPerTransaction)
         throws IOException {
       this.file = file;
       this.encryptionKeyProvider = encryptionKeyProvider;
       readFileHandles.add(open());
+      this.fsyncPerTransaction = fsyncPerTransaction;
       open = true;
     }
 
@@ -430,8 +477,11 @@ public abstract class LogFile {
           throw new NoopRecordException("No op record found. Corrupt record " +
             "may have been repaired by File Channel Integrity tool");
         }
-        Preconditions.checkState(operation == OP_RECORD,
-            Integer.toHexString(operation));
+        if (operation != OP_RECORD) {
+          throw new CorruptEventException(
+            "Operation code is invalid. File " +
+              "is corrupt. Please run File Channel Integrity tool.");
+        }
         TransactionEventRecord record = doGet(fileHandle);
         if(!(record instanceof Put)) {
           Preconditions.checkState(false, "Record is " +
@@ -491,8 +541,8 @@ public abstract class LogFile {
       }
       int remaining = readFileHandles.remainingCapacity();
       if(remaining > 0) {
-        LOG.info("Opening " + file + " for read, remaining capacity is "
-            + remaining);
+        LOG.info("Opening " + file + " for read, remaining number of file " +
+          "handles available for reads of this file is " + remaining);
         return open();
       }
       return readFileHandles.take();
@@ -647,11 +697,20 @@ public abstract class LogFile {
     output.put(buffer);
   }
   protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle)
-      throws IOException {
+      throws IOException, CorruptEventException {
     int length = fileHandle.readInt();
-    Preconditions.checkState(length >= 0, Integer.toHexString(length));
+    if (length < 0) {
+      throw new CorruptEventException("Length of event is: " + String.valueOf
+        (length) + ". Event must have length >= 0. Possible corruption of " +
+        "data or partial fsync.");
+    }
     byte[] buffer = new byte[length];
-    fileHandle.readFully(buffer);
+    try {
+      fileHandle.readFully(buffer);
+    } catch (EOFException ex) {
+      throw new CorruptEventException("Remaining data in file less than " +
+        "expected size of event.", ex);
+    }
     return buffer;
   }
 
@@ -659,7 +718,7 @@ public abstract class LogFile {
     File file = new File(args[0]);
     LogFile.SequentialReader reader = null;
     try {
-      reader = LogFileFactory.getSequentialReader(file, null);
+      reader = LogFileFactory.getSequentialReader(file, null, false);
       LogRecord entry;
       FlumeEventPointer ptr;
       // for puts the fileId is the fileID of the file they exist in

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 9c98d8c..7d7fd85 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -68,17 +68,19 @@ class LogFileFactory {
       long maxFileSize, @Nullable Key encryptionKey,
       @Nullable String encryptionKeyAlias,
       @Nullable String encryptionCipherProvider,
-      long usableSpaceRefreshInterval) throws IOException {
+      long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+      int fsyncInterval) throws IOException {
     Preconditions.checkState(!file.exists(), "File already exists "  +
       file.getAbsolutePath());
     Preconditions.checkState(file.createNewFile(), "File could not be created "
         + file.getAbsolutePath());
     return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey,
-        encryptionKeyAlias, encryptionCipherProvider, usableSpaceRefreshInterval);
+        encryptionKeyAlias, encryptionCipherProvider,
+        usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
   }
 
   static LogFile.RandomReader getRandomReader(File file,
-      @Nullable KeyProvider encryptionKeyProvider)
+      @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
       throws IOException {
     RandomAccessFile logFile = new RandomAccessFile(file, "r");
     try {
@@ -86,7 +88,8 @@ class LogFileFactory {
       // either this is a rr for a just created file or
       // the metadata file exists and as such it's V3
       if(logFile.length() == 0L || metaDataFile.exists()) {
-        return new LogFileV3.RandomReader(file, encryptionKeyProvider);
+        return new LogFileV3.RandomReader(file, encryptionKeyProvider,
+          fsyncPerTransaction);
       }
       int version = logFile.readInt();
       if(Serialization.VERSION_2 == version) {
@@ -106,7 +109,7 @@ class LogFileFactory {
   }
 
   static LogFile.SequentialReader getSequentialReader(File file,
-      @Nullable KeyProvider encryptionKeyProvider)
+      @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
       throws IOException {
     RandomAccessFile logFile = null;
     try {
@@ -159,7 +162,8 @@ class LogFileFactory {
           throw new EOFException(String.format("MetaData file %s is empty",
               metaDataFile));
         }
-        return new LogFileV3.SequentialReader(file, encryptionKeyProvider);
+        return new LogFileV3.SequentialReader(file, encryptionKeyProvider,
+          fsyncPerTransaction);
       }
       logFile = new RandomAccessFile(file, "r");
       int version = logFile.readInt();

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
index f286c57..bb25e95 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -96,7 +96,8 @@ class LogFileV2 extends LogFile {
     Writer(File file, int logFileID, long maxFileSize,
         long usableSpaceRefreshInterval)
         throws IOException {
-      super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval);
+      super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval,
+        true, 0);
       RandomAccessFile writeFileHandle = getFileHandle();
       writeFileHandle.writeInt(getVersion());
       writeFileHandle.writeInt(logFileID);
@@ -116,7 +117,7 @@ class LogFileV2 extends LogFile {
   static class RandomReader extends LogFile.RandomReader {
     RandomReader(File file)
         throws IOException {
-      super(file, null);
+      super(file, null, true);
     }
     @Override
     int getVersion() {

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index 38f6ecb..9b0ef93 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -21,10 +21,12 @@ package org.apache.flume.channel.file;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.CipherProvider;
 import org.apache.flume.channel.file.encryption.CipherProviderFactory;
+import org.apache.flume.channel.file.encryption.DecryptionFailureException;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.slf4j.Logger;
@@ -178,11 +180,11 @@ public class LogFileV3 extends LogFile {
         @Nullable Key encryptionKey,
         @Nullable String encryptionKeyAlias,
         @Nullable String encryptionCipherProvider,
-        long usableSpaceRefreshInterval)
-        throws IOException {
+        long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+        int fsyncInterval) throws IOException {
       super(file, logFileID, maxFileSize, CipherProviderFactory.
           getEncrypter(encryptionCipherProvider, encryptionKey),
-          usableSpaceRefreshInterval);
+          usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
       ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
           ProtosFactory.LogFileMetaData.newBuilder();
       if(encryptionKey != null) {
@@ -219,10 +221,11 @@ public class LogFileV3 extends LogFile {
     private volatile String cipherProvider;
     private volatile byte[] parameters;
     private BlockingQueue<CipherProvider.Decryptor> decryptors =
-        new LinkedBlockingDeque<CipherProvider.Decryptor>();
-    RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider)
-        throws IOException {
-      super(file, encryptionKeyProvider);
+      new LinkedBlockingDeque<CipherProvider.Decryptor>();
+
+    RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider,
+      boolean fsyncPerTransaction) throws IOException {
+      super(file, encryptionKeyProvider, fsyncPerTransaction);
     }
     private void initialize() throws IOException {
       File metaDataFile = Serialization.getMetaDataFile(getFile());
@@ -281,10 +284,10 @@ public class LogFileV3 extends LogFile {
           initialize();
         }
       }
-      byte[] buffer = readDelimitedBuffer(fileHandle);
-      CipherProvider.Decryptor decryptor = null;
       boolean success = false;
+      CipherProvider.Decryptor decryptor = null;
       try {
+        byte[] buffer = readDelimitedBuffer(fileHandle);
         if(encryptionEnabled) {
           decryptor = getDecryptor();
           buffer = decryptor.decrypt(buffer);
@@ -293,6 +296,8 @@ public class LogFileV3 extends LogFile {
             fromByteArray(buffer);
         success = true;
         return event;
+      } catch(DecryptionFailureException ex) {
+        throw new CorruptEventException("Error decrypting event", ex);
       } finally {
         if(success && encryptionEnabled && decryptor != null) {
           decryptors.offer(decryptor);
@@ -303,10 +308,12 @@ public class LogFileV3 extends LogFile {
 
   public static class SequentialReader extends LogFile.SequentialReader {
     private CipherProvider.Decryptor decryptor;
-
+    private final boolean fsyncPerTransaction;
     public SequentialReader(File file, @Nullable KeyProvider
-      encryptionKeyProvider) throws EOFException, IOException {
+      encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException,
+      IOException {
       super(file, encryptionKeyProvider);
+      this.fsyncPerTransaction = fsyncPerTransaction;
       File metaDataFile = Serialization.getMetaDataFile(file);
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
@@ -351,13 +358,33 @@ public class LogFileV3 extends LogFile {
     }
 
     @Override
-    LogRecord doNext(int offset) throws IOException, CorruptEventException {
-      byte[] buffer = readDelimitedBuffer(getFileHandle());
-      if(decryptor != null) {
-        buffer = decryptor.decrypt(buffer);
+    LogRecord doNext(int offset) throws IOException, CorruptEventException,
+      DecryptionFailureException {
+      byte[] buffer = null;
+      TransactionEventRecord event = null;
+      try {
+        buffer = readDelimitedBuffer(getFileHandle());
+        if (decryptor != null) {
+          buffer = decryptor.decrypt(buffer);
+        }
+        event = TransactionEventRecord.fromByteArray(buffer);
+      } catch (CorruptEventException ex) {
+        LOGGER.warn("Corrupt file found. File id: log-" + this.getLogFileID(),
+          ex);
+        // Return null so that replay handler thinks all events in this file
+        // have been taken.
+        if (!fsyncPerTransaction) {
+          return null;
+        }
+        throw ex;
+      } catch (DecryptionFailureException ex) {
+        if (!fsyncPerTransaction) {
+          LOGGER.warn("Could not decrypt even read from channel. Skipping " +
+            "event.", ex);
+          return null;
+        }
+        throw ex;
       }
-      TransactionEventRecord event =
-          TransactionEventRecord.fromByteArray(buffer);
       return new LogRecord(getLogFileID(), offset, event);
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index e668c2e..a559503 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -52,6 +52,7 @@ class ReplayHandler {
   private final Map<Integer, LogFile.SequentialReader> readers;
   private final PriorityQueue<LogRecord> logRecordBuffer;
   private final KeyProvider encryptionKeyProvider;
+  private final boolean fsyncPerTransaction;
   /**
    * This data structure stores takes for which we found a commit in the log
    * files before we found a commit for the put. This can happen if the channel
@@ -91,19 +92,22 @@ class ReplayHandler {
   public int getCommitCount() {
     return commitCount;
   }
+
   @VisibleForTesting
   public int getRollbackCount() {
     return rollbackCount;
   }
 
   ReplayHandler(FlumeEventQueue queue,
-      @Nullable KeyProvider encryptionKeyProvider) {
+    @Nullable KeyProvider encryptionKeyProvider,
+    boolean fsyncPerTransaction) {
     this.queue = queue;
     this.lastCheckpoint = queue.getLogWriteOrderID();
     pendingTakes = Lists.newArrayList();
     readers = Maps.newHashMap();
     logRecordBuffer = new PriorityQueue<LogRecord>();
     this.encryptionKeyProvider = encryptionKeyProvider;
+    this.fsyncPerTransaction = fsyncPerTransaction;
   }
   /**
    * Replay logic from Flume1.2 which can be activated if the v2 logic
@@ -129,7 +133,8 @@ class ReplayHandler {
       LOG.info("Replaying " + log);
       LogFile.SequentialReader reader = null;
       try {
-        reader = LogFileFactory.getSequentialReader(log, encryptionKeyProvider);
+        reader = LogFileFactory.getSequentialReader(log,
+          encryptionKeyProvider, fsyncPerTransaction);
         reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
         LogRecord entry;
         FlumeEventPointer ptr;
@@ -257,7 +262,8 @@ class ReplayHandler {
         LOG.info("Replaying " + log);
         try {
           LogFile.SequentialReader reader =
-              LogFileFactory.getSequentialReader(log, encryptionKeyProvider);
+            LogFileFactory.getSequentialReader(log, encryptionKeyProvider,
+              fsyncPerTransaction);
           reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
           Preconditions.checkState(!readers.containsKey(reader.getLogFileID()),
               "Readers " + readers + " already contains "

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index ea7f00c..1eb3f4f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.proto.ProtosFactory;
@@ -207,6 +208,9 @@ public abstract class TransactionEventRecord implements Writable {
           ProtosFactory.TransactionEventFooter.
           parseDelimitedFrom(in), "Footer cannot be null");
       return transactionEvent;
+    } catch (InvalidProtocolBufferException ex) {
+      throw new CorruptEventException(
+        "Could not parse event from data file.", ex);
     } finally {
       try {
         in.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
index d0a84fe..9ee4245 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
@@ -103,14 +103,15 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
     }
   }
 
-  private static byte[] doFinal(Cipher cipher, byte[] input) {
+  private static byte[] doFinal(Cipher cipher, byte[] input)
+    throws DecryptionFailureException{
     try {
       return cipher.doFinal(input);
     } catch (Exception e) {
       String msg = "Unable to encrypt or decrypt data " + TYPE
           + " input.length " + input.length;
       LOG.error(msg, e);
-      throw Throwables.propagate(e);
+      throw new DecryptionFailureException(msg, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
new file mode 100644
index 0000000..0155c39
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file.encryption;
+
+import org.apache.flume.FlumeException;
+
+/**
+ * Exception that is thrown when the channel is unable to decrypt an even
+ * read from the channel.
+ */
+public class DecryptionFailureException extends FlumeException {
+  private static final long serialVersionUID = 6646810195384793646L;
+
+
+  public DecryptionFailureException(String msg) {
+    super(msg);
+  }
+
+  public DecryptionFailureException(String msg, Throwable th) {
+    super(msg, th);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index 621d445..c6c6ad3 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -74,7 +74,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
     FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
           inflightPutsFile, queueSetDir);
     CheckpointRebuilder checkpointRebuilder =
-        new CheckpointRebuilder(getAllLogs(dataDirs), queue);
+        new CheckpointRebuilder(getAllLogs(dataDirs), queue, true);
     Assert.assertTrue(checkpointRebuilder.rebuild());
     channel = createFileChannel(overrides);
     channel.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 25765b5..bb22e26 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -573,8 +574,25 @@ public class TestFileChannel extends TestFileChannelBase {
   }
 
   @Test (expected = IllegalStateException.class)
-  public void testChannelDiesOnCorruptEvent() throws Exception {
-    final FileChannel channel = createFileChannel();
+  public void testChannelDiesOnCorruptEventFsync() throws Exception {
+    testChannelDiesOnCorruptEvent(true);
+  }
+
+
+  @Test
+  public void testChannelDiesOnCorruptEventNoFsync() throws
+    Exception {
+    testChannelDiesOnCorruptEvent(false);
+  }
+
+
+
+  private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn)
+    throws Exception {
+    Map<String, String> overrides = new HashMap<String, String>();
+    overrides.put(FileChannelConfiguration.FSYNC_PER_TXN,
+      String.valueOf(fsyncPerTxn));
+    final FileChannel channel = createFileChannel(overrides);
     channel.start();
     putEvents(channel,"test-corrupt-event",100,100);
     for(File dataDir : dataDirs) {
@@ -596,8 +614,9 @@ public class TestFileChannel extends TestFileChannelBase {
         }
       }
     }
+    Set<String> events;
     try {
-      consumeChannel(channel, true);
+      events = consumeChannel(channel, true);
     } catch (IllegalStateException ex) {
       // The rollback call in takeEvents() in TestUtils will cause an
       // IllegalArgumentException - and this should be tested to verify the
@@ -605,9 +624,13 @@ public class TestFileChannel extends TestFileChannelBase {
       Assert.assertTrue(ex.getMessage().contains("Log is closed"));
       throw ex;
     }
-    Assert.fail();
-
-
+    if(fsyncPerTxn) {
+      Assert.fail();
+    } else {
+      // The corrupt event must be missing, the rest should be
+      // returned
+      Assert.assertEquals(99, events.size());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index d1f51fc..c9a64ed 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -76,7 +76,7 @@ public class TestLog {
    */
   @Test
   public void testPutGet()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -89,7 +89,7 @@ public class TestLog {
   }
   @Test
   public void testRoll()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     log.shutdownWorker();
     Thread.sleep(1000);
     for (int i = 0; i < 1000; i++) {
@@ -119,7 +119,7 @@ public class TestLog {
    */
   @Test
   public void testPutCommit()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn);
@@ -247,16 +247,16 @@ public class TestLog {
    */
   @Test
   public void testPutTakeRollbackLogReplayV1()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     doPutTakeRollback(true);
   }
   @Test
   public void testPutTakeRollbackLogReplayV2()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     doPutTakeRollback(false);
   }
   public void doPutTakeRollback(boolean useLogReplayV1)
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long putTransactionID = ++transactionID;
     FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn);
@@ -396,7 +396,7 @@ public class TestLog {
   }
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -410,7 +410,7 @@ public class TestLog {
   }
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay()
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -427,7 +427,7 @@ public class TestLog {
   }
   public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn,
       FlumeEventPointer eventPointer) throws IOException,
-    InterruptedException, NoopRecordException {
+    InterruptedException, NoopRecordException, CorruptEventException  {
     for (int i = 0; i < dataDirs.length; i++) {
       for(File logFile : LogUtils.getLogs(dataDirs[i])) {
         if(logFile.length() == 0L) {
@@ -467,7 +467,7 @@ public class TestLog {
 
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn)
-    throws IOException, InterruptedException, NoopRecordException {
+    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNotNull(eventPointerOut);

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index e5d830e..976a112 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -63,7 +63,7 @@ public class TestLogFile {
     dataFile = new File(dataDir, String.valueOf(fileID));
     Assert.assertTrue(dataDir.isDirectory());
     logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
-        Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE);
+        Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE, true, 0);
   }
   @After
   public void cleanup() throws IOException {
@@ -80,7 +80,7 @@ public class TestLogFile {
     Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null, Long.MAX_VALUE);
+          null, Long.MAX_VALUE, true, 0);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
@@ -94,7 +94,7 @@ public class TestLogFile {
     Assert.assertTrue(dataFile.mkdirs());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null, Long.MAX_VALUE);
+          null, Long.MAX_VALUE, true, 0);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
@@ -109,7 +109,7 @@ public class TestLogFile {
     CompletionService<Void> completionService = new ExecutorCompletionService
       <Void>(executorService);
     final LogFile.RandomReader logFileReader =
-        LogFileFactory.getRandomReader(dataFile, null);
+        LogFileFactory.getRandomReader(dataFile, null, true);
     for (int i = 0; i < 1000; i++) {
       // first try and throw failures
       synchronized (errors) {
@@ -168,7 +168,7 @@ public class TestLogFile {
       puts.put(ptr.getOffset(), put);
     }
     LogFile.SequentialReader reader =
-        LogFileFactory.getSequentialReader(dataFile, null);
+        LogFileFactory.getSequentialReader(dataFile, null, true);
     LogRecord entry;
     while((entry = reader.next()) != null) {
       Integer offset = entry.getOffset();
@@ -202,7 +202,7 @@ public class TestLogFile {
       Assert.fail("Renaming to meta.old failed");
     }
     LogFile.SequentialReader reader =
-            LogFileFactory.getSequentialReader(dataFile, null);
+            LogFileFactory.getSequentialReader(dataFile, null, true);
     Assert.assertTrue(metadataFile.exists());
     Assert.assertFalse(oldMetadataFile.exists());
     LogRecord entry;
@@ -240,7 +240,7 @@ public class TestLogFile {
       Assert.fail("Renaming to meta.temp failed");
     }
     LogFile.SequentialReader reader =
-            LogFileFactory.getSequentialReader(dataFile, null);
+            LogFileFactory.getSequentialReader(dataFile, null, true);
     Assert.assertTrue(metadataFile.exists());
     Assert.assertFalse(tempMetadataFile.exists());
     Assert.assertFalse(oldMetadataFile.exists());
@@ -281,7 +281,7 @@ public class TestLogFile {
   @Test (expected = CorruptEventException.class)
   public void testPutGetCorruptEvent() throws Exception {
     final LogFile.RandomReader logFileReader =
-      LogFileFactory.getRandomReader(dataFile, null);
+      LogFileFactory.getRandomReader(dataFile, null, true);
     final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
     final Put put = new Put(++transactionID, WriteOrderOracle.next(),
       eventIn);
@@ -306,7 +306,7 @@ public class TestLogFile {
   @Test (expected = NoopRecordException.class)
   public void testPutGetNoopEvent() throws Exception {
     final LogFile.RandomReader logFileReader =
-      LogFileFactory.getRandomReader(dataFile, null);
+      LogFileFactory.getRandomReader(dataFile, null, true);
     final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
     final Put put = new Put(++transactionID, WriteOrderOracle.next(),
       eventIn);

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 0fb9bc2..61f38d2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -190,9 +190,9 @@ public class TestUtils {
           result.add(new String(event.getBody(), Charsets.UTF_8));
         }
         transaction.commit();
-      } catch (Exception ex) {
+      } catch (Throwable ex) {
         transaction.rollback();
-        throw ex;
+        throw new RuntimeException(ex);
       } finally {
         transaction.close();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
index aa24fa5..d0753a6 100644
--- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
@@ -70,7 +70,7 @@ public class FileChannelIntegrityTool implements FlumeTool {
         for (File dataFile : dataFiles) {
           LOG.info("Checking for corruption in " + dataFile.toString());
           LogFile.SequentialReader reader =
-            new LogFileV3.SequentialReader(dataFile, null);
+            new LogFileV3.SequentialReader(dataFile, null, true);
           LogFile.OperationRecordUpdater updater = new LogFile
             .OperationRecordUpdater(dataFile);
           boolean fileDone = false;

http://git-wip-us.apache.org/repos/asf/flume/blob/6115e7d6/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index d328671..f24ae56 100644
--- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -120,7 +120,7 @@ public class TestFileChannelIntegrityTool {
     int corrupted = 0;
     for (File dataFile : files) {
       LogFile.SequentialReader reader =
-        new LogFileV3.SequentialReader(dataFile, null);
+        new LogFileV3.SequentialReader(dataFile, null, true);
       RandomAccessFile handle = new RandomAccessFile(dataFile, "rw");
       long eventPosition1 = reader.getPosition();
       LogRecord rec = reader.next();


Mime
View raw message