flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [10/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes
Date Thu, 30 Jun 2016 02:21:36 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index 9dfa0d1..f1a892a 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -18,6 +18,12 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -26,50 +32,42 @@ import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
 final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(EventQueueBackingStoreFileV3.class);
+  private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class);
   private final File metaDataFile;
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
-      String name) throws IOException, BadCheckpointException {
+                               String name) throws IOException, BadCheckpointException {
     this(checkpointFile, capacity, name, null, false, false);
   }
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
-      String name, File checkpointBackupDir,
-      boolean backupCheckpoint, boolean compressBackup)
+                               String name, File checkpointBackupDir,
+                               boolean backupCheckpoint, boolean compressBackup)
       throws IOException, BadCheckpointException {
     super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint,
-      compressBackup);
+        compressBackup);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
     metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     LOG.info("Starting up with " + checkpointFile + " and " + metaDataFile);
-    if(metaDataFile.exists()) {
+    if (metaDataFile.exists()) {
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
         LOG.info("Reading checkpoint metadata from " + metaDataFile);
         ProtosFactory.Checkpoint checkpoint =
             ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream);
-        if(checkpoint == null) {
+        if (checkpoint == null) {
           throw new BadCheckpointException("The checkpoint metadata file does "
-                  + "not exist or has zero length");
+              + "not exist or has zero length");
         }
         int version = checkpoint.getVersion();
-        if(version != getVersion()) {
+        if (version != getVersion()) {
           throw new BadCheckpointException("Invalid version: " + version +
-                  " " + name + ", expected " + getVersion());
+              " " + name + ", expected " + getVersion());
         }
         long logWriteOrderID = checkpoint.getWriteOrderID();
-        if(logWriteOrderID != getCheckpointLogWriteOrderID()) {
+        if (logWriteOrderID != getCheckpointLogWriteOrderID()) {
           String msg = "Checkpoint and Meta files have differing " +
               "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and "
               + logWriteOrderID;
@@ -80,15 +78,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
         setLogWriteOrderID(logWriteOrderID);
         setSize(checkpoint.getQueueSize());
         setHead(checkpoint.getQueueHead());
-        for(ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) {
+        for (ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) {
           Integer logFileID = activeLog.getLogFileID();
           Integer count = activeLog.getCount();
           logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count));
         }
       } catch (InvalidProtocolBufferException ex) {
         throw new BadCheckpointException("Checkpoint metadata file is invalid. "
-                + "The agent might have been stopped while it was being "
-                + "written", ex);
+            + "The agent might have been stopped while it was being "
+            + "written", ex);
       } finally {
         try {
           inputStream.close();
@@ -97,7 +95,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
         }
       }
     } else {
-      if(backupExists(checkpointBackupDir) && shouldBackup) {
+      if (backupExists(checkpointBackupDir) && shouldBackup) {
         // If a backup exists, then throw an exception to recover checkpoint
         throw new BadCheckpointException("The checkpoint metadata file does " +
             "not exist, but a backup exists");
@@ -121,6 +119,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
       }
     }
   }
+
   File getMetaDataFile() {
     return metaDataFile;
   }
@@ -129,6 +128,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
   protected int getVersion() {
     return Serialization.VERSION_3;
   }
+
   @Override
   protected void writeCheckpointMetaData() throws IOException {
     ProtosFactory.Checkpoint.Builder checkpointBuilder =
@@ -137,14 +137,14 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
     checkpointBuilder.setQueueHead(getHead());
     checkpointBuilder.setQueueSize(getSize());
     checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
-    for(Integer logFileID : logFileIDReferenceCounts.keySet()) {
+    for (Integer logFileID : logFileIDReferenceCounts.keySet()) {
       int count = logFileIDReferenceCounts.get(logFileID).get();
-      if(count != 0) {
-         ProtosFactory.ActiveLog.Builder activeLogBuilder =
-             ProtosFactory.ActiveLog.newBuilder();
-         activeLogBuilder.setLogFileID(logFileID);
-         activeLogBuilder.setCount(count);
-         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
+      if (count != 0) {
+        ProtosFactory.ActiveLog.Builder activeLogBuilder =
+            ProtosFactory.ActiveLog.newBuilder();
+        activeLogBuilder.setLogFileID(logFileID);
+        activeLogBuilder.setCount(count);
+        checkpointBuilder.addActiveLogs(activeLogBuilder.build());
       }
     }
     FileOutputStream outputStream = new FileOutputStream(metaDataFile);
@@ -161,8 +161,8 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
   }
 
   static void upgrade(EventQueueBackingStoreFileV2 backingStoreV2,
-      File checkpointFile, File metaDataFile)
-          throws IOException {
+                      File checkpointFile, File metaDataFile)
+      throws IOException {
 
     int head = backingStoreV2.getHead();
     int size = backingStoreV2.getSize();
@@ -176,14 +176,14 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
     checkpointBuilder.setQueueHead(head);
     checkpointBuilder.setQueueSize(size);
     checkpointBuilder.setWriteOrderID(writeOrderID);
-    for(Integer logFileID : referenceCounts.keySet()) {
+    for (Integer logFileID : referenceCounts.keySet()) {
       int count = referenceCounts.get(logFileID).get();
-      if(count > 0) {
-         ProtosFactory.ActiveLog.Builder activeLogBuilder =
-             ProtosFactory.ActiveLog.newBuilder();
-         activeLogBuilder.setLogFileID(logFileID);
-         activeLogBuilder.setCount(count);
-         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
+      if (count > 0) {
+        ProtosFactory.ActiveLog.Builder activeLogBuilder =
+            ProtosFactory.ActiveLog.newBuilder();
+        activeLogBuilder.setLogFileID(logFileID);
+        activeLogBuilder.setCount(count);
+        checkpointBuilder.addActiveLogs(activeLogBuilder.build());
       }
     }
     FileOutputStream outputStream = new FileOutputStream(metaDataFile);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
index ff5242a..4c0c96c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
@@ -33,7 +33,7 @@ public class EventUtils {
    * @return Event if Put instance is present, null otherwise
    */
   public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) {
-    if(transactionEventRecord instanceof Put) {
+    if (transactionEventRecord instanceof Put) {
       return ((Put)transactionEventRecord).getEvent();
     }
     return null;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index ed2b996..9d82e43 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -25,7 +25,11 @@ import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
-import org.apache.flume.*;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
 import org.apache.flume.annotations.Disposable;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
@@ -71,8 +75,7 @@ import java.util.concurrent.TimeUnit;
 @Disposable
 public class FileChannel extends BasicChannelSemantics {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(FileChannel.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class);
 
   private Integer capacity = 0;
   private int keepAlive;
@@ -125,8 +128,8 @@ public class FileChannel extends BasicChannelSemantics {
         context.getString(FileChannelConfiguration.CHECKPOINT_DIR,
             homePath + "/.flume/file-channel/checkpoint").trim();
 
-    String strBackupCheckpointDir = context.getString
-      (FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
+    String strBackupCheckpointDir =
+        context.getString(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
 
     String[] strDataDirs = Iterables.toArray(
         Splitter.on(",").trimResults().omitEmptyStrings().split(
@@ -137,9 +140,9 @@ public class FileChannel extends BasicChannelSemantics {
 
     if (useDualCheckpoints) {
       Preconditions.checkState(!strBackupCheckpointDir.isEmpty(),
-        "Dual checkpointing is enabled, but the backup directory is not set. " +
-          "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " +
-          "to enable dual checkpointing");
+          "Dual checkpointing is enabled, but the backup directory is not set. " +
+              "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " +
+              "to enable dual checkpointing");
       backupCheckpointDir = new File(strBackupCheckpointDir);
       /*
        * If the backup directory is the same as the checkpoint directory,
@@ -147,9 +150,9 @@ public class FileChannel extends BasicChannelSemantics {
        * channel.
        */
       Preconditions.checkState(!backupCheckpointDir.equals(checkpointDir),
-        "Could not configure " + getName() + ". The checkpoint backup " +
-          "directory and the checkpoint directory are " +
-          "configured to be the same.");
+          "Could not configure " + getName() + ". The checkpoint backup " +
+              "directory and the checkpoint directory are " +
+              "configured to be the same.");
     }
 
     dataDirs = new File[strDataDirs.length];
@@ -159,10 +162,10 @@ public class FileChannel extends BasicChannelSemantics {
 
     capacity = context.getInteger(FileChannelConfiguration.CAPACITY,
         FileChannelConfiguration.DEFAULT_CAPACITY);
-    if(capacity <= 0) {
+    if (capacity <= 0) {
       capacity = FileChannelConfiguration.DEFAULT_CAPACITY;
       LOG.warn("Invalid capacity specified, initializing channel to "
-              + "default capacity of {}", capacity);
+          + "default capacity of {}", capacity);
     }
 
     keepAlive =
@@ -172,48 +175,48 @@ public class FileChannel extends BasicChannelSemantics {
         context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY,
             FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY);
 
-    if(transactionCapacity <= 0) {
+    if (transactionCapacity <= 0) {
       transactionCapacity =
-              FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY;
+          FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY;
       LOG.warn("Invalid transaction capacity specified, " +
           "initializing channel to default " +
           "capacity of {}", transactionCapacity);
     }
 
     Preconditions.checkState(transactionCapacity <= capacity,
-      "File Channel transaction capacity cannot be greater than the " +
-        "capacity of the channel.");
+        "File Channel transaction capacity cannot be greater than the " +
+            "capacity of the channel.");
 
     checkpointInterval =
-            context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
+        context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
             FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
     if (checkpointInterval <= 0) {
       LOG.warn("Checkpoint interval is invalid: " + checkpointInterval
-              + ", using default: "
-              + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
+          + ", using default: "
+          + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
 
       checkpointInterval =
-              FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL;
+          FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL;
     }
 
     // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
     maxFileSize = Math.min(
-      context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
-        FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
-      FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
+        context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
+            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
+        FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
 
     minimumRequiredSpace = Math.max(
-      context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE,
-        FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE),
-      FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE);
+        context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE,
+            FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE),
+        FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE);
 
     useLogReplayV1 = context.getBoolean(
         FileChannelConfiguration.USE_LOG_REPLAY_V1,
-          FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1);
+        FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1);
 
     useFastReplay = context.getBoolean(
-            FileChannelConfiguration.USE_FAST_REPLAY,
-            FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY);
+        FileChannelConfiguration.USE_FAST_REPLAY,
+        FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY);
 
     Context encryptionContext = new Context(
         context.getSubProperties(EncryptionConfiguration.ENCRYPTION_PREFIX +
@@ -224,41 +227,41 @@ public class FileChannel extends BasicChannelSemantics {
         EncryptionConfiguration.ACTIVE_KEY);
     encryptionCipherProvider = encryptionContext.getString(
         EncryptionConfiguration.CIPHER_PROVIDER);
-    if(encryptionKeyProviderName != null) {
+    if (encryptionKeyProviderName != null) {
       Preconditions.checkState(!Strings.isNullOrEmpty(encryptionActiveKey),
           "Encryption configuration problem: " +
               EncryptionConfiguration.ACTIVE_KEY + " is missing");
       Preconditions.checkState(!Strings.isNullOrEmpty(encryptionCipherProvider),
           "Encryption configuration problem: " +
               EncryptionConfiguration.CIPHER_PROVIDER + " is missing");
-      Context keyProviderContext = new Context(encryptionContext.
-          getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
-      encryptionKeyProvider = KeyProviderFactory.
-          getInstance(encryptionKeyProviderName, keyProviderContext);
+      Context keyProviderContext = new Context(
+          encryptionContext.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + "."));
+      encryptionKeyProvider = KeyProviderFactory.getInstance(
+          encryptionKeyProviderName, keyProviderContext);
     } else {
       Preconditions.checkState(encryptionActiveKey == null,
           "Encryption configuration problem: " +
               EncryptionConfiguration.ACTIVE_KEY + " is present while key " +
-          "provider name is not.");
+              "provider name is not.");
       Preconditions.checkState(encryptionCipherProvider == null,
           "Encryption configuration problem: " +
               EncryptionConfiguration.CIPHER_PROVIDER + " is present while " +
-          "key provider name is not.");
+              "key provider name is not.");
     }
 
     fsyncPerTransaction = context.getBoolean(FileChannelConfiguration
-      .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN);
+        .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN);
 
     fsyncInterval = context.getInteger(FileChannelConfiguration
-      .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);
+        .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL);
 
     checkpointOnClose = context.getBoolean(FileChannelConfiguration
-            .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE);
+        .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE);
 
-    if(queueRemaining == null) {
+    if (queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
     }
-    if(log != null) {
+    if (log != null) {
       log.setCheckpointInterval(checkpointInterval);
       log.setMaxFileSize(maxFileSize);
     }
@@ -299,7 +302,7 @@ public class FileChannel extends BasicChannelSemantics {
       Preconditions.checkState(queueRemaining.tryAcquire(depth),
           "Unable to acquire " + depth + " permits " + channelNameDescriptor);
       LOG.info("Queue Size after replay: " + depth + " "
-           + channelNameDescriptor);
+          + channelNameDescriptor);
     } catch (Throwable t) {
       open = false;
       startupError = t;
@@ -337,9 +340,9 @@ public class FileChannel extends BasicChannelSemantics {
 
   @Override
   protected BasicTransactionSemantics createTransaction() {
-    if(!open) {
+    if (!open) {
       String msg = "Channel closed " + channelNameDescriptor;
-      if(startupError != null) {
+      if (startupError != null) {
         msg += ". Due to " + startupError.getClass().getName() + ": " +
             startupError.getMessage();
         throw new IllegalStateException(msg, startupError);
@@ -348,27 +351,28 @@ public class FileChannel extends BasicChannelSemantics {
     }
 
     FileBackedTransaction trans = transactions.get();
-    if(trans != null && !trans.isClosed()) {
+    if (trans != null && !trans.isClosed()) {
       Preconditions.checkState(false,
           "Thread has transaction which is still open: " +
-              trans.getStateAsString()  + channelNameDescriptor);
+              trans.getStateAsString() + channelNameDescriptor);
     }
     trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
-      transactionCapacity, keepAlive, queueRemaining, getName(),
-      fsyncPerTransaction, channelCounter);
+        transactionCapacity, keepAlive, queueRemaining, getName(),
+        fsyncPerTransaction, channelCounter);
     transactions.set(trans);
     return trans;
   }
 
   protected int getDepth() {
-    Preconditions.checkState(open, "Channel closed"  + channelNameDescriptor);
+    Preconditions.checkState(open, "Channel closed" + channelNameDescriptor);
     Preconditions.checkNotNull(log, "log");
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Preconditions.checkNotNull(queue, "queue");
     return queue.getSize();
   }
+
   void close() {
-    if(open) {
+    if (open) {
       open = false;
       try {
         log.close();
@@ -398,11 +402,12 @@ public class FileChannel extends BasicChannelSemantics {
 
   /**
    * Did this channel recover a backup of the checkpoint to restart?
+   *
    * @return true if the channel recovered using a backup.
    */
   @VisibleForTesting
   boolean checkpointBackupRestored() {
-    if(log != null) {
+    if (log != null) {
       return log.backupRestored();
     }
     return false;
@@ -428,10 +433,11 @@ public class FileChannel extends BasicChannelSemantics {
     private final String channelNameDescriptor;
     private final ChannelCounter channelCounter;
     private final boolean fsyncPerTransaction;
+
     public FileBackedTransaction(Log log, long transactionID,
-        int transCapacity, int keepAlive, Semaphore queueRemaining,
-        String name, boolean fsyncPerTransaction, ChannelCounter
-      counter) {
+                                 int transCapacity, int keepAlive, Semaphore queueRemaining,
+                                 String name, boolean fsyncPerTransaction, ChannelCounter
+                                     counter) {
       this.log = log;
       queue = log.getFlumeEventQueue();
       this.transactionID = transactionID;
@@ -443,9 +449,11 @@ public class FileChannel extends BasicChannelSemantics {
       channelNameDescriptor = "[channel=" + name + "]";
       this.channelCounter = counter;
     }
+
     private boolean isClosed() {
       return State.CLOSED.equals(getState());
     }
+
     private String getStateAsString() {
       return String.valueOf(getState());
     }
@@ -453,7 +461,7 @@ public class FileChannel extends BasicChannelSemantics {
     @Override
     protected void doPut(Event event) throws InterruptedException {
       channelCounter.incrementEventPutAttemptCount();
-      if(putList.remainingCapacity() == 0) {
+      if (putList.remainingCapacity() == 0) {
         throw new ChannelException("Put queue for FileBackedTransaction " +
             "of capacity " + putList.size() + " full, consider " +
             "committing more frequently, increasing capacity or " +
@@ -461,7 +469,7 @@ public class FileChannel extends BasicChannelSemantics {
       }
       // this does not need to be in the critical section as it does not
       // modify the structure of the log or queue.
-      if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
+      if (!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
         throw new ChannelFullException("The channel has reached it's capacity. "
             + "This might be the result of a sink on the channel having too "
             + "low of batch size, a downstream system running slower than "
@@ -473,15 +481,15 @@ public class FileChannel extends BasicChannelSemantics {
       try {
         FlumeEventPointer ptr = log.put(transactionID, event);
         Preconditions.checkState(putList.offer(ptr), "putList offer failed "
-          + channelNameDescriptor);
+            + channelNameDescriptor);
         queue.addWithoutCommit(ptr, transactionID);
         success = true;
       } catch (IOException e) {
         throw new ChannelException("Put failed due to IO error "
-                + channelNameDescriptor, e);
+            + channelNameDescriptor, e);
       } finally {
         log.unlockShared();
-        if(!success) {
+        if (!success) {
           // release slot obtained in the case
           // the put fails for any reason
           queueRemaining.release();
@@ -492,11 +500,11 @@ public class FileChannel extends BasicChannelSemantics {
     @Override
     protected Event doTake() throws InterruptedException {
       channelCounter.incrementEventTakeAttemptCount();
-      if(takeList.remainingCapacity() == 0) {
+      if (takeList.remainingCapacity() == 0) {
         throw new ChannelException("Take list for FileBackedTransaction, capacity " +
             takeList.size() + " full, consider committing more frequently, " +
             "increasing capacity, or increasing thread count. "
-               + channelNameDescriptor);
+            + channelNameDescriptor);
       }
       log.lockShared();
       /*
@@ -517,24 +525,24 @@ public class FileChannel extends BasicChannelSemantics {
               // first add to takeList so that if write to disk
               // fails rollback actually does it's work
               Preconditions.checkState(takeList.offer(ptr),
-                "takeList offer failed "
-                  + channelNameDescriptor);
+                  "takeList offer failed "
+                      + channelNameDescriptor);
               log.take(transactionID, ptr); // write take to disk
               Event event = log.get(ptr);
               return event;
             } catch (IOException e) {
               throw new ChannelException("Take failed due to IO error "
-                + channelNameDescriptor, e);
+                  + channelNameDescriptor, e);
             } catch (NoopRecordException e) {
               LOG.warn("Corrupt record replaced by File Channel Integrity " +
-                "tool found. Will retrieve next event", e);
+                  "tool found. Will retrieve next event", e);
               takeList.remove(ptr);
             } catch (CorruptEventException ex) {
               if (fsyncPerTransaction) {
                 throw new ChannelException(ex);
               }
               LOG.warn("Corrupt record found. Event will be " +
-                "skipped, and next event will be read.", ex);
+                  "skipped, and next event will be read.", ex);
               takeList.remove(ptr);
             }
           }
@@ -543,20 +551,21 @@ public class FileChannel extends BasicChannelSemantics {
         log.unlockShared();
       }
     }
+
     @Override
     protected void doCommit() throws InterruptedException {
       int puts = putList.size();
       int takes = takeList.size();
-      if(puts > 0) {
+      if (puts > 0) {
         Preconditions.checkState(takes == 0, "nonzero puts and takes "
-                + channelNameDescriptor);
+            + channelNameDescriptor);
         log.lockShared();
         try {
           log.commitPut(transactionID);
           channelCounter.addToEventPutSuccessCount(puts);
           synchronized (queue) {
-            while(!putList.isEmpty()) {
-              if(!queue.addTail(putList.removeFirst())) {
+            while (!putList.isEmpty()) {
+              if (!queue.addTail(putList.removeFirst())) {
                 StringBuilder msg = new StringBuilder();
                 msg.append("Queue add failed, this shouldn't be able to ");
                 msg.append("happen. A portion of the transaction has been ");
@@ -572,7 +581,7 @@ public class FileChannel extends BasicChannelSemantics {
           }
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
-                  + channelNameDescriptor, e);
+              + channelNameDescriptor, e);
         } finally {
           log.unlockShared();
         }
@@ -595,13 +604,14 @@ public class FileChannel extends BasicChannelSemantics {
       takeList.clear();
       channelCounter.setChannelSize(queue.getSize());
     }
+
     @Override
     protected void doRollback() throws InterruptedException {
       int puts = putList.size();
       int takes = takeList.size();
       log.lockShared();
       try {
-        if(takes > 0) {
+        if (takes > 0) {
           Preconditions.checkState(puts == 0, "nonzero puts and takes "
               + channelNameDescriptor);
           synchronized (queue) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 5c3c48f..c5678d4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -87,10 +87,8 @@ public class FileChannelConfiguration {
   public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
   public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
 
-  public static final String COMPRESS_BACKUP_CHECKPOINT =
-    "compressBackupCheckpoint";
-  public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT
-    = false;
+  public static final String COMPRESS_BACKUP_CHECKPOINT = "compressBackupCheckpoint";
+  public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT = false;
 
   public static final String FSYNC_PER_TXN = "fsyncPerTransaction";
   public static final boolean DEFAULT_FSYNC_PRE_TXN = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
index 53c1251..cd1b6d8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java
@@ -37,15 +37,15 @@ import org.apache.flume.Event;
  */
 class FlumeEvent implements Event, Writable {
 
-  private static final byte EVENT_MAP_TEXT_WRITABLE_ID = Byte.valueOf(Integer.valueOf(-116).byteValue());
+  private static final byte EVENT_MAP_TEXT_WRITABLE_ID =
+      Byte.valueOf(Integer.valueOf(-116).byteValue());
 
-  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
-      new ThreadLocal<CharsetEncoder>() {
+  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
     @Override
     protected CharsetEncoder initialValue() {
-      return Charset.forName("UTF-8").newEncoder().
-          onMalformedInput(CodingErrorAction.REPLACE).
-          onUnmappableCharacter(CodingErrorAction.REPLACE);
+      return Charset.forName("UTF-8").newEncoder()
+                                     .onMalformedInput(CodingErrorAction.REPLACE)
+                                     .onUnmappableCharacter(CodingErrorAction.REPLACE);
     }
   };
 
@@ -53,9 +53,9 @@ class FlumeEvent implements Event, Writable {
       new ThreadLocal<CharsetDecoder>() {
     @Override
     protected CharsetDecoder initialValue() {
-      return Charset.forName("UTF-8").newDecoder().
-          onMalformedInput(CodingErrorAction.REPLACE).
-          onUnmappableCharacter(CodingErrorAction.REPLACE);
+      return Charset.forName("UTF-8").newDecoder()
+                                     .onMalformedInput(CodingErrorAction.REPLACE)
+                                     .onUnmappableCharacter(CodingErrorAction.REPLACE);
     }
   };
 
@@ -65,6 +65,7 @@ class FlumeEvent implements Event, Writable {
   private FlumeEvent() {
     this(null, null);
   }
+
   FlumeEvent(Map<String, String> headers, byte[] body) {
     this.headers = headers;
     this.body = body;
@@ -116,13 +117,12 @@ class FlumeEvent implements Event, Writable {
         WritableUtils.writeVInt(out, valueLength );
         out.write(valueBytes.array(), 0, valueLength);
       }
-    }
-    else {
+    } else {
       out.writeInt( 0 );
     }
 
     byte[] body = getBody();
-    if(body == null) {
+    if (body == null) {
       out.writeInt(-1);
     } else {
       out.writeInt(body.length);
@@ -174,7 +174,7 @@ class FlumeEvent implements Event, Writable {
 
     byte[] body = null;
     int bodyLength = in.readInt();
-    if(bodyLength != -1) {
+    if (bodyLength != -1) {
       body = new byte[bodyLength];
       in.readFully(body);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
index 5f06ab7..ebf7843 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java
@@ -27,6 +27,7 @@ package org.apache.flume.channel.file;
 class FlumeEventPointer {
   private final int fileID;
   private final int offset;
+
   FlumeEventPointer(int fileID, int offset) {
     this.fileID = fileID;
     this.offset = offset;
@@ -34,24 +35,28 @@ class FlumeEventPointer {
      * Log files used to have a header, now metadata is in
      * a separate file so data starts at offset 0.
      */
-    if(offset < 0) {
+    if (offset < 0) {
       throw new IllegalArgumentException("offset = " + offset + "(" +
           Integer.toHexString(offset) + ")" + ", fileID = " + fileID
             + "(" + Integer.toHexString(fileID) + ")");
     }
   }
+
   int getFileID() {
     return fileID;
   }
+
   int getOffset() {
     return offset;
   }
+
   public long toLong() {
     long result = fileID;
     result = (long)fileID << 32;
     result += (long)offset;
     return result;
   }
+
   @Override
   public int hashCode() {
     final int prime = 31;
@@ -60,6 +65,7 @@ class FlumeEventPointer {
     result = prime * result + offset;
     return result;
   }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
@@ -80,10 +86,12 @@ class FlumeEventPointer {
     }
     return true;
   }
+
   @Override
   public String toString() {
     return "FlumeEventPointer [fileID=" + fileID + ", offset=" + offset + "]";
   }
+
   public static FlumeEventPointer fromLong(long value) {
     int fileID = (int)(value >>> 32);
     int offset = (int)value;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index d305f4d..4311c7f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -18,6 +18,17 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -31,18 +42,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
 /**
  * Queue of events in the channel. This queue stores only
  * {@link FlumeEventPointer} objects which are represented
@@ -54,7 +53,7 @@ import com.google.common.collect.SetMultimap;
  */
 final class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
-  .getLogger(FlumeEventQueue.class);
+      .getLogger(FlumeEventQueue.class);
   private static final int EMPTY = 0;
   private final EventQueueBackingStore backingStore;
   private final String channelNameDescriptor;
@@ -72,7 +71,7 @@ final class FlumeEventQueue {
    * @throws IOException
    */
   FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile,
-          File inflightPutsFile, File queueSetDBDir) throws Exception {
+                  File inflightPutsFile, File queueSetDBDir) throws Exception {
     Preconditions.checkArgument(backingStore.getCapacity() > 0,
         "Capacity must be greater than zero");
     Preconditions.checkNotNull(backingStore, "backingStore");
@@ -88,13 +87,13 @@ final class FlumeEventQueue {
       LOG.error("Could not read checkpoint.", e);
       throw e;
     }
-    if(queueSetDBDir.isDirectory()) {
+    if (queueSetDBDir.isDirectory()) {
       FileUtils.deleteDirectory(queueSetDBDir);
-    } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) {
+    } else if (queueSetDBDir.isFile() && !queueSetDBDir.delete()) {
       throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and"
           + " could not be deleted");
     }
-    if(!queueSetDBDir.mkdirs()) {
+    if (!queueSetDBDir.mkdirs()) {
       throw new IllegalStateException("Could not create QueueSet Dir "
           + queueSetDBDir);
     }
@@ -108,7 +107,7 @@ final class FlumeEventQueue {
         .mmapFileEnableIfSupported()
         .make();
     queueSet =
-      db.createHashSet("QueueSet " + " - " + backingStore.getName()).make();
+        db.createHashSet("QueueSet " + " - " + backingStore.getName()).make();
     long start = System.currentTimeMillis();
     for (int i = 0; i < backingStore.getSize(); i++) {
       queueSet.add(get(i));
@@ -118,12 +117,12 @@ final class FlumeEventQueue {
   }
 
   SetMultimap<Long, Long> deserializeInflightPuts()
-          throws IOException, BadCheckpointException{
+      throws IOException, BadCheckpointException {
     return inflightPuts.deserialize();
   }
 
   SetMultimap<Long, Long> deserializeInflightTakes()
-          throws IOException, BadCheckpointException{
+      throws IOException, BadCheckpointException {
     return inflightTakes.deserialize();
   }
 
@@ -133,9 +132,9 @@ final class FlumeEventQueue {
 
   synchronized boolean checkpoint(boolean force) throws Exception {
     if (!backingStore.syncRequired()
-            && !inflightTakes.syncRequired()
-            && !force) { //No need to check inflight puts, since that would
-                         //cause elements.syncRequired() to return true.
+        && !inflightTakes.syncRequired()
+        && !force) { //No need to check inflight puts, since that would
+      //cause elements.syncRequired() to return true.
       LOG.debug("Checkpoint not required");
       return false;
     }
@@ -152,13 +151,13 @@ final class FlumeEventQueue {
    * @return FlumeEventPointer or null if queue is empty
    */
   synchronized FlumeEventPointer removeHead(long transactionID) {
-    if(backingStore.getSize()  == 0) {
+    if (backingStore.getSize() == 0) {
       return null;
     }
 
     long value = remove(0, transactionID);
     Preconditions.checkState(value != EMPTY, "Empty value "
-          + channelNameDescriptor);
+        + channelNameDescriptor);
 
     FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
     backingStore.decrementFileID(ptr.getFileID());
@@ -168,6 +167,7 @@ final class FlumeEventQueue {
   /**
    * Add a FlumeEventPointer to the head of the queue.
    * Called during rollbacks.
+   *
    * @param FlumeEventPointer to be added
    * @return true if space was available and pointer was
    * added to the queue
@@ -180,7 +180,7 @@ final class FlumeEventQueue {
     //there is a buuuuuuuug!
     if (backingStore.getSize() == backingStore.getCapacity()) {
       LOG.error("Could not reinsert to queue, events which were taken but "
-              + "not committed. Please report this issue.");
+          + "not committed. Please report this issue.");
       return false;
     }
 
@@ -195,6 +195,7 @@ final class FlumeEventQueue {
 
   /**
    * Add a FlumeEventPointer to the tail of the queue.
+   *
    * @param FlumeEventPointer to be added
    * @return true if space was available and pointer
    * was added to the queue
@@ -215,6 +216,7 @@ final class FlumeEventQueue {
   /**
    * Must be called when a put happens to the log. This ensures that put commits
    * after checkpoints will retrieve all events committed in that txn.
+   *
    * @param e
    * @param transactionID
    */
@@ -227,16 +229,19 @@ final class FlumeEventQueue {
    * only be used when recovering from a crash. It is not
    * legal to call this method after replayComplete has been
    * called.
+   *
    * @param FlumeEventPointer to be removed
    * @return true if the FlumeEventPointer was found
    * and removed
    */
+  // remove() overloads should not be split, according to checkstyle.
+  // CHECKSTYLE:OFF
   synchronized boolean remove(FlumeEventPointer e) {
     long value = e.toLong();
     Preconditions.checkArgument(value != EMPTY);
     if (queueSet == null) {
-     throw new IllegalStateException("QueueSet is null, thus replayComplete"
-         + " has been called which is illegal");
+      throw new IllegalStateException("QueueSet is null, thus replayComplete"
+          + " has been called which is illegal");
     }
     if (!queueSet.contains(value)) {
       return false;
@@ -244,7 +249,7 @@ final class FlumeEventQueue {
     searchCount++;
     long start = System.currentTimeMillis();
     for (int i = 0; i < backingStore.getSize(); i++) {
-      if(get(i) == value) {
+      if (get(i) == value) {
         remove(i, 0);
         FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
         backingStore.decrementFileID(ptr.getFileID());
@@ -255,6 +260,8 @@ final class FlumeEventQueue {
     searchTime += System.currentTimeMillis() - start;
     return false;
   }
+  // CHECKSTYLE:ON
+
   /**
    * @return a copy of the set of fileIDs which are currently on the queue
    * will be normally be used when deciding which data files can
@@ -299,19 +306,19 @@ final class FlumeEventQueue {
 
     backingStore.setSize(backingStore.getSize() + 1);
 
-    if (index <= backingStore.getSize()/2) {
+    if (index <= backingStore.getSize() / 2) {
       // Shift left
       backingStore.setHead(backingStore.getHead() - 1);
       if (backingStore.getHead() < 0) {
         backingStore.setHead(backingStore.getCapacity() - 1);
       }
       for (int i = 0; i < index; i++) {
-        set(i, get(i+1));
+        set(i, get(i + 1));
       }
     } else {
       // Sift right
       for (int i = backingStore.getSize() - 1; i > index; i--) {
-        set(i, get(i-1));
+        set(i, get(i - 1));
       }
     }
     set(index, value);
@@ -323,6 +330,7 @@ final class FlumeEventQueue {
 
   /**
    * Must be called when a transaction is being committed or rolled back.
+   *
    * @param transactionID
    */
   synchronized void completeTransaction(long transactionID) {
@@ -334,7 +342,7 @@ final class FlumeEventQueue {
   protected synchronized long remove(int index, long transactionID) {
     if (index < 0 || index > backingStore.getSize() - 1) {
       throw new IndexOutOfBoundsException("index = " + index
-          + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor);
+          + ", queueSize " + backingStore.getSize() + " " + channelNameDescriptor);
     }
     copyCount++;
     long start = System.currentTimeMillis();
@@ -343,13 +351,13 @@ final class FlumeEventQueue {
       queueSet.remove(value);
     }
     //if txn id = 0, we are recovering from a crash.
-    if(transactionID != 0) {
+    if (transactionID != 0) {
       inflightTakes.addEvent(transactionID, value);
     }
-    if (index > backingStore.getSize()/2) {
+    if (index > backingStore.getSize() / 2) {
       // Move tail part to left
       for (int i = index; i < backingStore.getSize() - 1; i++) {
-        long rightValue = get(i+1);
+        long rightValue = get(i + 1);
         set(i, rightValue);
       }
       set(backingStore.getSize() - 1, EMPTY);
@@ -357,7 +365,7 @@ final class FlumeEventQueue {
       // Move head part to right
       for (int i = index - 1; i >= 0; i--) {
         long leftValue = get(i);
-        set(i+1, leftValue);
+        set(i + 1, leftValue);
       }
       set(0, EMPTY);
       backingStore.setHead(backingStore.getHead() + 1);
@@ -386,7 +394,7 @@ final class FlumeEventQueue {
       if (db != null) {
         db.close();
       }
-    } catch(Exception ex) {
+    } catch (Exception ex) {
       LOG.warn("Error closing db", ex);
     }
     try {
@@ -407,7 +415,7 @@ final class FlumeEventQueue {
         searchTime + ", Copy Count = " + copyCount + ", Copy Time = " +
         copyTime;
     LOG.info(msg);
-    if(db != null) {
+    if (db != null) {
       db.close();
     }
     queueSet = null;
@@ -440,11 +448,11 @@ final class FlumeEventQueue {
     private volatile boolean syncRequired = false;
     private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create();
 
-    public InflightEventWrapper(File inflightEventsFile) throws Exception{
-      if(!inflightEventsFile.exists()){
-        Preconditions.checkState(inflightEventsFile.createNewFile(),"Could not"
-                + "create inflight events file: "
-                + inflightEventsFile.getCanonicalPath());
+    public InflightEventWrapper(File inflightEventsFile) throws Exception {
+      if (!inflightEventsFile.exists()) {
+        Preconditions.checkState(inflightEventsFile.createNewFile(), "Could not"
+            + "create inflight events file: "
+            + inflightEventsFile.getCanonicalPath());
       }
       this.inflightEventsFile = inflightEventsFile;
       file = new RandomAccessFile(inflightEventsFile, "rw");
@@ -454,10 +462,11 @@ final class FlumeEventQueue {
 
     /**
      * Complete the transaction, and remove all events from inflight list.
+     *
      * @param transactionID
      */
     public boolean completeTransaction(Long transactionID) {
-      if(!inflightEvents.containsKey(transactionID)) {
+      if (!inflightEvents.containsKey(transactionID)) {
         return false;
       }
       inflightEvents.removeAll(transactionID);
@@ -468,28 +477,30 @@ final class FlumeEventQueue {
 
     /**
      * Add an event pointer to the inflights list.
+     *
      * @param transactionID
      * @param pointer
      */
-    public void addEvent(Long transactionID, Long pointer){
+    public void addEvent(Long transactionID, Long pointer) {
       inflightEvents.put(transactionID, pointer);
       inflightFileIDs.put(transactionID,
-              FlumeEventPointer.fromLong(pointer).getFileID());
+          FlumeEventPointer.fromLong(pointer).getFileID());
       syncRequired = true;
     }
 
     /**
      * Serialize the set of in flights into a byte longBuffer.
+     *
      * @return Returns the checksum of the buffer that is being
      * asynchronously written to disk.
      */
     public void serializeAndWrite() throws Exception {
       Collection<Long> values = inflightEvents.values();
-      if(!fileChannel.isOpen()){
+      if (!fileChannel.isOpen()) {
         file = new RandomAccessFile(inflightEventsFile, "rw");
         fileChannel = file.getChannel();
       }
-      if(values.isEmpty()){
+      if (values.isEmpty()) {
         file.setLength(0L);
       }
       //What is written out?
@@ -498,14 +509,15 @@ final class FlumeEventQueue {
       //transactionid numberofeventsforthistxn listofeventpointers
 
       try {
-        int expectedFileSize = (((inflightEvents.keySet().size() * 2) //For transactionIDs and events per txn ID
-                + values.size()) * 8) //Event pointers
-                + 16; //Checksum
+        int expectedFileSize = (((inflightEvents.keySet().size() * 2) //for transactionIDs and
+                                                                      //events per txn ID
+            + values.size()) * 8) //Event pointers
+            + 16; //Checksum
         //There is no real need of filling the channel with 0s, since we
         //will write the exact nummber of bytes as expected file size.
         file.setLength(expectedFileSize);
         Preconditions.checkState(file.length() == expectedFileSize,
-                "Expected File size of inflight events file does not match the "
+            "Expected File size of inflight events file does not match the "
                 + "current file size. Checkpoint is incomplete.");
         file.seek(0);
         final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize);
@@ -515,10 +527,10 @@ final class FlumeEventQueue {
           longBuffer.put(txnID);
           longBuffer.put((long) pointers.size());
           LOG.debug("Number of events inserted into "
-                  + "inflights file: " + String.valueOf(pointers.size())
-                  + " file: " + inflightEventsFile.getCanonicalPath());
+              + "inflights file: " + String.valueOf(pointers.size())
+              + " file: " + inflightEventsFile.getCanonicalPath());
           long[] written = ArrayUtils.toPrimitive(
-                  pointers.toArray(new Long[0]));
+              pointers.toArray(new Long[0]));
           longBuffer.put(written);
         }
         byte[] checksum = digest.digest(buffer.array());
@@ -539,56 +551,55 @@ final class FlumeEventQueue {
      * of transactionIDs to events that were inflight.
      *
      * @return - map of inflight events per txnID.
-     *
      */
     public SetMultimap<Long, Long> deserialize()
-            throws IOException, BadCheckpointException {
+        throws IOException, BadCheckpointException {
       SetMultimap<Long, Long> inflights = HashMultimap.create();
       if (!fileChannel.isOpen()) {
         file = new RandomAccessFile(inflightEventsFile, "rw");
         fileChannel = file.getChannel();
       }
-      if(file.length() == 0) {
+      if (file.length() == 0) {
         return inflights;
       }
       file.seek(0);
       byte[] checksum = new byte[16];
       file.read(checksum);
       ByteBuffer buffer = ByteBuffer.allocate(
-              (int)(file.length() - file.getFilePointer()));
+          (int) (file.length() - file.getFilePointer()));
       fileChannel.read(buffer);
       byte[] fileChecksum = digest.digest(buffer.array());
       if (!Arrays.equals(checksum, fileChecksum)) {
         throw new BadCheckpointException("Checksum of inflights file differs"
-                + " from the checksum expected.");
+            + " from the checksum expected.");
       }
       buffer.position(0);
       LongBuffer longBuffer = buffer.asLongBuffer();
       try {
         while (true) {
           long txnID = longBuffer.get();
-          int numEvents = (int)(longBuffer.get());
-          for(int i = 0; i < numEvents; i++) {
+          int numEvents = (int) (longBuffer.get());
+          for (int i = 0; i < numEvents; i++) {
             long val = longBuffer.get();
             inflights.put(txnID, val);
           }
         }
       } catch (BufferUnderflowException ex) {
         LOG.debug("Reached end of inflights buffer. Long buffer position ="
-                + String.valueOf(longBuffer.position()));
+            + String.valueOf(longBuffer.position()));
       }
-      return  inflights;
+      return inflights;
     }
 
     public int getSize() {
       return inflightEvents.size();
     }
 
-    public boolean syncRequired(){
+    public boolean syncRequired() {
       return syncRequired;
     }
 
-    public Collection<Integer> getFileIDs(){
+    public Collection<Integer> getFileIDs() {
       return inflightFileIDs.values();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 247c287..02d8e7f 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -63,7 +63,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  * Stores FlumeEvents on disk and pointers to the events in a in memory queue.
  * Once a log object is created the replay method should be called to reconcile
  * the on disk write ahead log with the last checkpoint of the queue.
- *
+ * <p>
  * Before calling any of commitPut/commitTake/get/put/rollback/take
  * {@linkplain org.apache.flume.channel.file.Log#lockShared()}
  * should be called. After
@@ -217,12 +217,12 @@ public class Log {
       return this;
     }
 
-    Builder setUseLogReplayV1(boolean useLogReplayV1){
+    Builder setUseLogReplayV1(boolean useLogReplayV1) {
       this.useLogReplayV1 = useLogReplayV1;
       return this;
     }
 
-    Builder setUseFastReplay(boolean useFastReplay){
+    Builder setUseFastReplay(boolean useFastReplay) {
       this.useFastReplay = useFastReplay;
       return this;
     }
@@ -264,46 +264,46 @@ public class Log {
 
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-        bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir,
-        bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
-        bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
-        bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
-        fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
+          bUseDualCheckpoints, bCompressBackupCheckpoint, bCheckpointDir,
+          bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
+          bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
+          bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
+          fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-    boolean useDualCheckpoints, boolean compressBackupCheckpoint,
-    File checkpointDir, File backupCheckpointDir,
-    String name, boolean useLogReplayV1, boolean useFastReplay,
-    long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
-    @Nullable String encryptionKeyAlias,
-    @Nullable String encryptionCipherProvider,
-    long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
-    int fsyncInterval, boolean checkpointOnClose, File... logDirs)
-          throws IOException {
+              boolean useDualCheckpoints, boolean compressBackupCheckpoint,
+              File checkpointDir, File backupCheckpointDir,
+              String name, boolean useLogReplayV1, boolean useFastReplay,
+              long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
+              @Nullable String encryptionKeyAlias,
+              @Nullable String encryptionCipherProvider,
+              long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+              int fsyncInterval, boolean checkpointOnClose, File... logDirs)
+      throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
-      "checkpointInterval <= 0");
+        "checkpointInterval <= 0");
     Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
     Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
     Preconditions.checkNotNull(checkpointDir, "checkpointDir");
     Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
         "usableSpaceRefreshInterval <= 0");
     Preconditions.checkArgument(
-      checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
-      + checkpointDir + " could not be created");
+        checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+            + checkpointDir + " could not be created");
     if (useDualCheckpoints) {
       Preconditions.checkNotNull(backupCheckpointDir, "backupCheckpointDir is" +
-        " null while dual checkpointing is enabled.");
+          " null while dual checkpointing is enabled.");
       Preconditions.checkArgument(
-        backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(),
-        "Backup CheckpointDir " + backupCheckpointDir +
-          " could not be created");
+          backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(),
+          "Backup CheckpointDir " + backupCheckpointDir +
+              " could not be created");
     }
     Preconditions.checkNotNull(logDirs, "logDirs");
     Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
     Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
-            "channel name should be specified");
+        "channel name should be specified");
 
     this.channelNameDescriptor = "[channel=" + name + "]";
     this.useLogReplayV1 = useLogReplayV1;
@@ -317,20 +317,20 @@ public class Log {
     locks = Maps.newHashMap();
     try {
       lock(checkpointDir);
-      if(useDualCheckpoints) {
+      if (useDualCheckpoints) {
         lock(backupCheckpointDir);
       }
       for (File logDir : logDirs) {
         lock(logDir);
       }
-    } catch(IOException e) {
+    } catch (IOException e) {
       unlock(checkpointDir);
       for (File logDir : logDirs) {
         unlock(logDir);
       }
       throw e;
     }
-    if(encryptionKeyProvider != null && encryptionKeyAlias != null &&
+    if (encryptionKeyProvider != null && encryptionKeyAlias != null &&
         encryptionCipherProvider != null) {
       LOGGER.info("Encryption is enabled with encryptionKeyProvider = " +
           encryptionKeyProvider + ", encryptionKeyAlias = " + encryptionKeyAlias
@@ -346,7 +346,7 @@ public class Log {
       throw new IllegalArgumentException("Encryption configuration must all " +
           "null or all not null: encryptionKeyProvider = " +
           encryptionKeyProvider + ", encryptionKeyAlias = " +
-          encryptionKeyAlias +  ", encryptionCipherProvider = " +
+          encryptionKeyAlias + ", encryptionCipherProvider = " +
           encryptionCipherProvider);
     }
     open = false;
@@ -364,7 +364,7 @@ public class Log {
 
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
-      ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
+        ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
         .build());
     workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
         this.checkpointInterval, this.checkpointInterval,
@@ -374,6 +374,7 @@ public class Log {
   /**
    * Read checkpoint and data files from disk replaying them to the state
    * directly before the shutdown or crash.
+   *
    * @throws IOException
    */
   void replay() throws IOException {
@@ -416,8 +417,8 @@ public class Log {
        * locations. We will read the last one written to disk.
        */
       File checkpointFile = new File(checkpointDir, "checkpoint");
-      if(shouldFastReplay) {
-        if(checkpointFile.exists()) {
+      if (shouldFastReplay) {
+        if (checkpointFile.exists()) {
           LOGGER.debug("Disabling fast full replay because checkpoint " +
               "exists: " + checkpointFile);
           shouldFastReplay = false;
@@ -434,14 +435,14 @@ public class Log {
 
       try {
         backingStore =
-          EventQueueBackingStoreFactory.get(checkpointFile,
-            backupCheckpointDir, queueCapacity, channelNameDescriptor,
-            true, this.useDualCheckpoints,
-            this.compressBackupCheckpoint);
+            EventQueueBackingStoreFactory.get(checkpointFile,
+                backupCheckpointDir, queueCapacity, channelNameDescriptor,
+                true, this.useDualCheckpoints,
+                this.compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-                inflightPutsFile, queueSetDir);
+            inflightPutsFile, queueSetDir);
         LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
-                + ", queue depth = " + queue.getSize());
+            + ", queue depth = " + queue.getSize());
 
         /*
          * We now have everything we need to actually replay the log files
@@ -460,7 +461,7 @@ public class Log {
               + "Restoring checkpoint and starting up.", ex);
           if (EventQueueBackingStoreFile.backupExists(backupCheckpointDir)) {
             backupRestored = EventQueueBackingStoreFile.restoreBackup(
-              checkpointDir, backupCheckpointDir);
+                checkpointDir, backupCheckpointDir);
           }
         }
         if (!backupRestored) {
@@ -472,16 +473,16 @@ public class Log {
           }
         }
         backingStore = EventQueueBackingStoreFactory.get(
-          checkpointFile, backupCheckpointDir, queueCapacity,
-          channelNameDescriptor, true, useDualCheckpoints,
-          compressBackupCheckpoint);
+            checkpointFile, backupCheckpointDir, queueCapacity,
+            channelNameDescriptor, true, useDualCheckpoints,
+            compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
-                inflightPutsFile, queueSetDir);
+            inflightPutsFile, queueSetDir);
         // If the checkpoint was deleted due to BadCheckpointException, then
         // trigger fast replay if the channel is configured to.
         shouldFastReplay = this.useFastReplay;
         doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
-        if(!shouldFastReplay) {
+        if (!shouldFastReplay) {
           didFullReplayDueToBadCheckpointException = true;
         }
       }
@@ -514,13 +515,13 @@ public class Log {
                         KeyProvider encryptionKeyProvider,
                         boolean useFastReplay) throws Exception {
     CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
-            queue, fsyncPerTransaction);
+        queue, fsyncPerTransaction);
     if (useFastReplay && rebuilder.rebuild()) {
       didFastReplay = true;
       LOGGER.info("Fast replay successful.");
     } else {
       ReplayHandler replayHandler = new ReplayHandler(queue,
-              encryptionKeyProvider, fsyncPerTransaction);
+          encryptionKeyProvider, fsyncPerTransaction);
       if (useLogReplayV1) {
         LOGGER.info("Replaying logs with v1 replay logic");
         replayHandler.replayLogv1(dataFiles);
@@ -540,10 +541,12 @@ public class Log {
   boolean didFastReplay() {
     return didFastReplay;
   }
+
   @VisibleForTesting
   public int getReadCount() {
     return readCount;
   }
+
   @VisibleForTesting
   public int getPutCount() {
     return putCount;
@@ -553,10 +556,12 @@ public class Log {
   public int getTakeCount() {
     return takeCount;
   }
+
   @VisibleForTesting
   public int getCommittedCount() {
     return committedCount;
   }
+
   @VisibleForTesting
   public int getRollbackCount() {
     return rollbackCount;
@@ -564,6 +569,7 @@ public class Log {
 
   /**
    * Was a checkpoint backup used to replay?
+   *
    * @return true if a checkpoint backup was used to replay.
    */
   @VisibleForTesting
@@ -597,7 +603,7 @@ public class Log {
    * @throws InterruptedException
    */
   FlumeEvent get(FlumeEventPointer pointer) throws IOException,
-    InterruptedException, NoopRecordException, CorruptEventException {
+      InterruptedException, NoopRecordException, CorruptEventException {
     Preconditions.checkState(open, "Log is closed");
     int id = pointer.getFileID();
     LogFile.RandomReader logFile = idLogFileMap.get(id);
@@ -608,7 +614,7 @@ public class Log {
       if (fsyncPerTransaction) {
         open = false;
         throw new IOException("Corrupt event found. Please run File Channel " +
-          "Integrity tool.", ex);
+            "Integrity tool.", ex);
       }
       throw ex;
     }
@@ -616,8 +622,9 @@ public class Log {
 
   /**
    * Log a put of an event
-   *
+   * <p>
    * Synchronization not required as this method is atomic
+   *
    * @param transactionID
    * @param event
    * @return
@@ -633,7 +640,7 @@ public class Log {
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
     long requiredSpace = minimumRequiredSpace + buffer.limit();
-    if(usableSpace <= requiredSpace) {
+    if (usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhausted, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
     }
@@ -644,7 +651,7 @@ public class Log {
         error = false;
         return ptr;
       } catch (LogFileRetryableIOException e) {
-        if(!open) {
+        if (!open) {
           throw e;
         }
         roll(logFileIndex, buffer);
@@ -653,7 +660,7 @@ public class Log {
         return ptr;
       }
     } finally {
-      if(error && open) {
+      if (error && open) {
         roll(logFileIndex);
       }
     }
@@ -661,8 +668,9 @@ public class Log {
 
   /**
    * Log a take of an event, pointer points at the corresponding put
-   *
+   * <p>
    * Synchronization not required as this method is atomic
+   *
    * @param transactionID
    * @param pointer
    * @throws IOException
@@ -676,7 +684,7 @@ public class Log {
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
     long requiredSpace = minimumRequiredSpace + buffer.limit();
-    if(usableSpace <= requiredSpace) {
+    if (usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhausted, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
     }
@@ -686,7 +694,7 @@ public class Log {
         logFiles.get(logFileIndex).take(buffer);
         error = false;
       } catch (LogFileRetryableIOException e) {
-        if(!open) {
+        if (!open) {
           throw e;
         }
         roll(logFileIndex, buffer);
@@ -694,7 +702,7 @@ public class Log {
         error = false;
       }
     } finally {
-      if(error && open) {
+      if (error && open) {
         roll(logFileIndex);
       }
     }
@@ -702,15 +710,16 @@ public class Log {
 
   /**
    * Log a rollback of a transaction
-   *
+   * <p>
    * Synchronization not required as this method is atomic
+   *
    * @param transactionID
    * @throws IOException
    */
   void rollback(long transactionID) throws IOException {
     Preconditions.checkState(open, "Log is closed");
 
-    if(LOGGER.isDebugEnabled()) {
+    if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Rolling back " + transactionID);
     }
     Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next());
@@ -718,7 +727,7 @@ public class Log {
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
     long requiredSpace = minimumRequiredSpace + buffer.limit();
-    if(usableSpace <= requiredSpace) {
+    if (usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhausted, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
     }
@@ -728,7 +737,7 @@ public class Log {
         logFiles.get(logFileIndex).rollback(buffer);
         error = false;
       } catch (LogFileRetryableIOException e) {
-        if(!open) {
+        if (!open) {
           throw e;
         }
         roll(logFileIndex, buffer);
@@ -736,7 +745,7 @@ public class Log {
         error = false;
       }
     } finally {
-      if(error && open) {
+      if (error && open) {
         roll(logFileIndex);
       }
     }
@@ -747,14 +756,15 @@ public class Log {
    * so we know if the pointers corresponding to the events
    * should be added or removed from the flume queue. We
    * could infer but it's best to be explicit.
-   *
+   * <p>
    * Synchronization not required as this method is atomic
+   *
    * @param transactionID
    * @throws IOException
    * @throws InterruptedException
    */
   void commitPut(long transactionID) throws IOException,
-  InterruptedException {
+      InterruptedException {
     Preconditions.checkState(open, "Log is closed");
     commit(transactionID, TransactionEventRecord.Type.PUT.get());
   }
@@ -764,20 +774,21 @@ public class Log {
    * so we know if the pointers corresponding to the events
    * should be added or removed from the flume queue. We
    * could infer but it's best to be explicit.
-   *
+   * <p>
    * Synchronization not required as this method is atomic
+   *
    * @param transactionID
    * @throws IOException
    * @throws InterruptedException
    */
   void commitTake(long transactionID) throws IOException,
-  InterruptedException {
+      InterruptedException {
     Preconditions.checkState(open, "Log is closed");
     commit(transactionID, TransactionEventRecord.Type.TAKE.get());
   }
 
 
-  private void unlockExclusive()  {
+  private void unlockExclusive() {
     checkpointWriterLock.unlock();
   }
 
@@ -785,11 +796,11 @@ public class Log {
     checkpointReadLock.lock();
   }
 
-  void unlockShared()  {
+  void unlockShared() {
     checkpointReadLock.unlock();
   }
 
-  private void lockExclusive(){
+  private void lockExclusive() {
     checkpointWriterLock.lock();
   }
 
@@ -797,23 +808,23 @@ public class Log {
    * Synchronization not required since this method gets the write lock,
    * so checkpoint and this method cannot run at the same time.
    */
-  void close() throws IOException{
+  void close() throws IOException {
     lockExclusive();
     try {
       open = false;
       try {
-        if(checkpointOnClose) {
+        if (checkpointOnClose) {
           writeCheckpoint(true); // do this before acquiring exclusive lock
         }
       } catch (Exception err) {
         LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor +
-                "Replay will take longer next time channel is started.", err);
+            "Replay will take longer next time channel is started.", err);
       }
       shutdownWorker();
       if (logFiles != null) {
         for (int index = 0; index < logFiles.length(); index++) {
           LogFile.Writer writer = logFiles.get(index);
-          if(writer != null) {
+          if (writer != null) {
             writer.close();
           }
         }
@@ -862,9 +873,11 @@ public class Log {
       LOGGER.error("Interrupted while waiting for worker to die.");
     }
   }
+
   void setCheckpointInterval(long checkpointInterval) {
     this.checkpointInterval = checkpointInterval;
   }
+
   void setMaxFileSize(long maxFileSize) {
     this.maxFileSize = maxFileSize;
   }
@@ -883,7 +896,7 @@ public class Log {
     int logFileIndex = nextLogWriter(transactionID);
     long usableSpace = logFiles.get(logFileIndex).getUsableSpace();
     long requiredSpace = minimumRequiredSpace + buffer.limit();
-    if(usableSpace <= requiredSpace) {
+    if (usableSpace <= requiredSpace) {
       throw new IOException("Usable space exhausted, only " + usableSpace +
           " bytes remaining, required " + requiredSpace + " bytes");
     }
@@ -898,7 +911,7 @@ public class Log {
         logFileWriter.sync();
         error = false;
       } catch (LogFileRetryableIOException e) {
-        if(!open) {
+        if (!open) {
           throw e;
         }
         roll(logFileIndex, buffer);
@@ -908,7 +921,7 @@ public class Log {
         error = false;
       }
     } finally {
-      if(error && open) {
+      if (error && open) {
         roll(logFileIndex);
       }
     }
@@ -917,11 +930,13 @@ public class Log {
 
   /**
    * Atomic so not synchronization required.
+   *
    * @return
    */
   private int nextLogWriter(long transactionID) {
-    return (int)Math.abs(transactionID % (long)logFiles.length());
+    return (int) Math.abs(transactionID % (long) logFiles.length());
   }
+
   /**
    * Unconditionally roll
    * Synchronization done internally
@@ -932,13 +947,14 @@ public class Log {
   private void roll(int index) throws IOException {
     roll(index, null);
   }
+
   /**
    * Roll a log if needed. Roll always occurs if the log at the index
    * does not exist (typically on startup), or buffer is null. Otherwise
    * LogFile.Writer.isRollRequired is checked again to ensure we don't
    * have threads pile up on this log resulting in multiple successive
    * rolls
-   *
+   * <p>
    * Synchronization required since both synchronized and unsynchronized
    * methods call this method, and this method acquires only a
    * read lock. The synchronization guarantees that multiple threads don't
@@ -948,7 +964,7 @@ public class Log {
    * @throws IOException
    */
   private synchronized void roll(int index, ByteBuffer buffer)
-    throws IOException {
+      throws IOException {
     lockShared();
 
     try {
@@ -956,17 +972,17 @@ public class Log {
       // check to make sure a roll is actually required due to
       // the possibility of multiple writes waiting on lock
       if (oldLogFile == null || buffer == null ||
-        oldLogFile.isRollRequired(buffer)) {
+          oldLogFile.isRollRequired(buffer)) {
         try {
           LOGGER.info("Roll start " + logDirs[index]);
           int fileID = nextFileID.incrementAndGet();
           File file = new File(logDirs[index], PREFIX + fileID);
           LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
-            maxFileSize, encryptionKey, encryptionKeyAlias,
-            encryptionCipherProvider, usableSpaceRefreshInterval,
-            fsyncPerTransaction, fsyncInterval);
+              maxFileSize, encryptionKey, encryptionKeyAlias,
+              encryptionCipherProvider, usableSpaceRefreshInterval,
+              fsyncPerTransaction, fsyncInterval);
           idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
-            encryptionKeyProvider, fsyncPerTransaction));
+              encryptionKeyProvider, fsyncPerTransaction));
           // writer from this point on will get new reference
           logFiles.set(index, writer);
           // close out old log
@@ -989,22 +1005,24 @@ public class Log {
   /**
    * Write the current checkpoint object and then swap objects so that
    * the next checkpoint occurs on the other checkpoint directory.
-   *
+   * <p>
    * Synchronization is not required because this method acquires a
    * write lock. So this method gets exclusive access to all the
    * data structures this method accesses.
-   * @param force  a flag to force the writing of checkpoint
+   *
+   * @param force a flag to force the writing of checkpoint
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
   private Boolean writeCheckpoint(Boolean force) throws Exception {
     boolean checkpointCompleted = false;
     long usableSpace = checkpointDir.getUsableSpace();
-    if(usableSpace <= minimumRequiredSpace) {
+    if (usableSpace <= minimumRequiredSpace) {
       throw new IOException("Usable space exhausted, only " + usableSpace +
           " bytes remaining, required " + minimumRequiredSpace + " bytes");
     }
     lockExclusive();
-    SortedSet<Integer> logFileRefCountsAll = null, logFileRefCountsActive = null;
+    SortedSet<Integer> logFileRefCountsAll = null;
+    SortedSet<Integer> logFileRefCountsActive = null;
     try {
       if (queue.checkpoint(force)) {
         long logWriteOrderID = queue.getLogWriteOrderID();
@@ -1048,7 +1066,7 @@ public class Log {
             writer.markCheckpoint(logWriteOrderID);
           } finally {
             reader = LogFileFactory.getRandomReader(file,
-                    encryptionKeyProvider, fsyncPerTransaction);
+                encryptionKeyProvider, fsyncPerTransaction);
             idLogFileMap.put(id, reader);
             writer.close();
           }
@@ -1058,7 +1076,7 @@ public class Log {
           idIterator.remove();
         }
         Preconditions.checkState(logFileRefCountsAll.size() == 0,
-                "Could not update all data file timestamps: " + logFileRefCountsAll);
+            "Could not update all data file timestamps: " + logFileRefCountsAll);
         //Add files from all log directories
         for (int index = 0; index < logDirs.length; index++) {
           logFileRefCountsActive.add(logFiles.get(index).getLogFileID());
@@ -1086,7 +1104,7 @@ public class Log {
     // these files) and delete them only after the next (since the current
     // checkpoint will become the backup at that time,
     // and thus these files are no longer needed).
-    for(File fileToDelete : pendingDeletes) {
+    for (File fileToDelete : pendingDeletes) {
       LOGGER.info("Removing old file: " + fileToDelete);
       FileUtils.deleteQuietly(fileToDelete);
     }
@@ -1095,7 +1113,7 @@ public class Log {
     // won't delete any files with an id larger than the min
     int minFileID = fileIDs.first();
     LOGGER.debug("Files currently in use: " + fileIDs);
-    for(File logDir : logDirs) {
+    for (File logDir : logDirs) {
       List<File> logs = LogUtils.getLogs(logDir);
       // sort oldset to newest
       LogUtils.sort(logs);
@@ -1104,9 +1122,9 @@ public class Log {
       for (int index = 0; index < size; index++) {
         File logFile = logs.get(index);
         int logFileID = LogUtils.getIDForFile(logFile);
-        if(logFileID < minFileID) {
+        if (logFileID < minFileID) {
           LogFile.RandomReader reader = idLogFileMap.remove(logFileID);
-          if(reader != null) {
+          if (reader != null) {
             reader.close();
           }
           File metaDataFile = Serialization.getMetaDataFile(logFile);
@@ -1116,12 +1134,13 @@ public class Log {
       }
     }
   }
+
   /**
    * Lock storage to provide exclusive access.
-   *
+   * <p>
    * <p> Locking is not supported by all file systems.
    * E.g., NFS does not consistently support exclusive locks.
-   *
+   * <p>
    * <p> If locking is supported we guarantee exculsive access to the
    * storage directory. Otherwise, no guarantee is given.
    *
@@ -1137,8 +1156,8 @@ public class Log {
       throw new IOException(msg);
     }
     FileLock secondLock = tryLock(dir);
-    if(secondLock != null) {
-      LOGGER.warn("Directory "+dir+" does not support locking");
+    if (secondLock != null) {
+      LOGGER.warn("Directory " + dir + " does not support locking");
       secondLock.release();
       secondLock.channel().close();
     }
@@ -1160,10 +1179,10 @@ public class Log {
     FileLock res = null;
     try {
       res = file.getChannel().tryLock();
-    } catch(OverlappingFileLockException oe) {
+    } catch (OverlappingFileLockException oe) {
       file.close();
       return null;
-    } catch(IOException e) {
+    } catch (IOException e) {
       LOGGER.error("Cannot create lock on " + lockF, e);
       file.close();
       throw e;
@@ -1178,13 +1197,14 @@ public class Log {
    */
   private void unlock(File dir) throws IOException {
     FileLock lock = locks.remove(dir.getAbsolutePath());
-    if(lock == null) {
+    if (lock == null) {
       return;
     }
     lock.release();
     lock.channel().close();
     lock = null;
   }
+
   static class BackgroundWorker implements Runnable {
     private static final Logger LOG = LoggerFactory
         .getLogger(BackgroundWorker.class);


Mime
View raw message