flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [09/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes
Date Thu, 30 Jun 2016 02:21:35 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 488dcf4..336aa2c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -50,20 +50,17 @@ import java.util.concurrent.atomic.AtomicLong;
 @InterfaceStability.Unstable
 public abstract class LogFile {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(LogFile.class);
-
+  private static final Logger LOG = LoggerFactory.getLogger(LogFile.class);
 
   /**
    * This class preallocates the data files 1MB at time to avoid
    * the updating of the inode on each write and to avoid the disk
    * filling up during a write. It's also faster, so there.
    */
-  private static final ByteBuffer FILL = DirectMemoryUtils.
-      allocate(1024 * 1024); // preallocation, 1MB
+  private static final ByteBuffer FILL = DirectMemoryUtils.allocate(1024 * 1024);
 
   public static final byte OP_RECORD = Byte.MAX_VALUE;
-  public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2;
+  public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE) / 2;
   public static final byte OP_EOF = Byte.MIN_VALUE;
 
   static {
@@ -73,7 +70,7 @@ public abstract class LogFile {
   }
 
   protected static void skipRecord(RandomAccessFile fileHandle,
-    int offset) throws IOException {
+                                   int offset) throws IOException {
     fileHandle.seek(offset);
     int length = fileHandle.readInt();
     fileHandle.skipBytes(length);
@@ -93,31 +90,40 @@ public abstract class LogFile {
       writeFileHandle = new RandomAccessFile(file, "rw");
 
     }
+
     protected RandomAccessFile getFileHandle() {
       return writeFileHandle;
     }
+
     protected void setLastCheckpointOffset(long lastCheckpointOffset) {
       this.lastCheckpointOffset = lastCheckpointOffset;
     }
+
     protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
       this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
     }
+
     protected long getLastCheckpointOffset() {
       return lastCheckpointOffset;
     }
+
     protected long getLastCheckpointWriteOrderID() {
       return lastCheckpointWriteOrderID;
     }
+
     protected File getFile() {
       return file;
     }
+
     protected int getLogFileID() {
       return logFileID;
     }
+
     void markCheckpoint(long logWriteOrderID)
         throws IOException {
       markCheckpoint(lastCheckpointOffset, logWriteOrderID);
     }
+
     abstract void markCheckpoint(long currentPosition, long logWriteOrderID)
         throws IOException;
 
@@ -150,9 +156,10 @@ public abstract class LogFile {
       Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero");
       value.addAndGet(-numBytes);
     }
+
     long getUsableSpace() {
       long now = System.currentTimeMillis();
-      if(now - interval > lastRefresh.get()) {
+      if (now - interval > lastRefresh.get()) {
         value.set(fs.getUsableSpace());
         lastRefresh.set(now);
       }
@@ -160,7 +167,7 @@ public abstract class LogFile {
     }
   }
 
-  static abstract class Writer {
+  abstract static class Writer {
     private final int logFileID;
     private final File file;
     private final long maxFileSize;
@@ -180,10 +187,9 @@ public abstract class LogFile {
     // To ensure we can count the number of fsyncs.
     private long syncCount;
 
-
     Writer(File file, int logFileID, long maxFileSize,
-        CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval,
-        boolean fsyncPerTransaction, int fsyncInterval) throws IOException {
+           CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval,
+           boolean fsyncPerTransaction, int fsyncInterval) throws IOException {
       this.file = file;
       this.logFileID = logFileID;
       this.maxFileSize = Math.min(maxFileSize,
@@ -193,7 +199,7 @@ public abstract class LogFile {
       writeFileChannel = writeFileHandle.getChannel();
       this.fsyncPerTransaction = fsyncPerTransaction;
       this.fsyncInterval = fsyncInterval;
-      if(!fsyncPerTransaction) {
+      if (!fsyncPerTransaction) {
         LOG.info("Sync interval = " + fsyncInterval);
         syncExecutor = Executors.newSingleThreadScheduledExecutor();
         syncExecutor.scheduleWithFixedDelay(new Runnable() {
@@ -203,7 +209,7 @@ public abstract class LogFile {
               sync();
             } catch (Throwable ex) {
               LOG.error("Data file, " + getFile().toString() + " could not " +
-                "be synced to disk due to an error.", ex);
+                  "be synced to disk due to an error.", ex);
             }
           }
         }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);
@@ -220,6 +226,7 @@ public abstract class LogFile {
     protected CipherProvider.Encryptor getEncryptor() {
       return encryptor;
     }
+
     int getLogFileID() {
       return logFileID;
     }
@@ -227,6 +234,7 @@ public abstract class LogFile {
     File getFile() {
       return file;
     }
+
     String getParent() {
       return file.getParent();
     }
@@ -240,7 +248,7 @@ public abstract class LogFile {
     }
 
     @VisibleForTesting
-    long getLastCommitPosition(){
+    long getLastCommitPosition() {
       return lastCommitPosition;
     }
 
@@ -253,6 +261,7 @@ public abstract class LogFile {
     long getSyncCount() {
       return syncCount;
     }
+
     synchronized long position() throws IOException {
       return getFileChannel().position();
     }
@@ -261,20 +270,22 @@ public abstract class LogFile {
     // methods, so all methods need to be synchronized.
 
     synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {
-      if(encryptor != null) {
+      if (encryptor != null) {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
       }
       Pair<Integer, Integer> pair = write(buffer);
       return new FlumeEventPointer(pair.getLeft(), pair.getRight());
     }
+
     synchronized void take(ByteBuffer buffer) throws IOException {
-      if(encryptor != null) {
+      if (encryptor != null) {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
       }
       write(buffer);
     }
+
     synchronized void rollback(ByteBuffer buffer) throws IOException {
-      if(encryptor != null) {
+      if (encryptor != null) {
         buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));
       }
       write(buffer);
@@ -290,20 +301,20 @@ public abstract class LogFile {
     }
 
     private Pair<Integer, Integer> write(ByteBuffer buffer)
-      throws IOException {
-      if(!isOpen()) {
+        throws IOException {
+      if (!isOpen()) {
         throw new LogFileRetryableIOException("File closed " + file);
       }
       long length = position();
       long expectedLength = length + (long) buffer.limit();
-      if(expectedLength > maxFileSize) {
+      if (expectedLength > maxFileSize) {
         throw new LogFileRetryableIOException(expectedLength + " > " +
             maxFileSize);
       }
-      int offset = (int)length;
+      int offset = (int) length;
       Preconditions.checkState(offset >= 0, String.valueOf(offset));
       // OP_RECORD + size + buffer
-      int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit();
+      int recordLength = 1 + (int) Serialization.SIZE_OF_INT + buffer.limit();
       usableSpace.decrement(recordLength);
       preallocate(recordLength);
       ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
@@ -323,15 +334,16 @@ public abstract class LogFile {
      * Sync the underlying log file to disk. Expensive call,
      * should be used only on commits. If a sync has already happened after
      * the last commit, this method is a no-op
+     *
      * @throws IOException
      * @throws LogFileRetryableIOException - if this log file is closed.
      */
     synchronized void sync() throws IOException {
       if (!fsyncPerTransaction && !dirty) {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
           LOG.debug(
-            "No events written to file, " + getFile().toString() +
-              " in last " + fsyncInterval + " or since last commit.");
+              "No events written to file, " + getFile().toString() +
+                  " in last " + fsyncInterval + " or since last commit.");
         }
         return;
       }
@@ -346,27 +358,29 @@ public abstract class LogFile {
       }
     }
 
-
     protected boolean isOpen() {
       return open;
     }
+
     protected RandomAccessFile getFileHandle() {
       return writeFileHandle;
     }
+
     protected FileChannel getFileChannel() {
       return writeFileChannel;
     }
+
     synchronized void close() {
-      if(open) {
+      if (open) {
         open = false;
         if (!fsyncPerTransaction) {
           // Shutdown the executor before attempting to close.
-          if(syncExecutor != null) {
+          if (syncExecutor != null) {
             // No need to wait for it to shutdown.
             syncExecutor.shutdown();
           }
         }
-        if(writeFileChannel.isOpen()) {
+        if (writeFileChannel.isOpen()) {
           LOG.info("Closing " + file);
           try {
             writeFileChannel.force(true);
@@ -381,9 +395,10 @@ public abstract class LogFile {
         }
       }
     }
+
     protected void preallocate(int size) throws IOException {
       long position = position();
-      if(position + size > getFileChannel().size()) {
+      if (position + size > getFileChannel().size()) {
         LOG.debug("Preallocating at position " + position);
         synchronized (FILL) {
           FILL.position(0);
@@ -404,7 +419,7 @@ public abstract class LogFile {
 
     public OperationRecordUpdater(File file) throws FileNotFoundException {
       Preconditions.checkState(file.exists(), "File to update, " +
-        file.toString() + " does not exist.");
+          file.toString() + " does not exist.");
       this.file = file;
       fileHandle = new RandomAccessFile(file, "rw");
     }
@@ -417,10 +432,10 @@ public abstract class LogFile {
       fileHandle.seek(offset);
       byte byteRead = fileHandle.readByte();
       Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP,
-        "Expected to read a record but the byte read indicates EOF");
+          "Expected to read a record but the byte read indicates EOF");
       fileHandle.seek(offset);
       LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " +
-        file.toString());
+          file.toString());
       fileHandle.writeByte(OP_NOOP);
     }
 
@@ -430,20 +445,21 @@ public abstract class LogFile {
         fileHandle.close();
       } catch (IOException e) {
         LOG.error("Could not close file handle to file " +
-          fileHandle.toString(), e);
+            fileHandle.toString(), e);
       }
     }
   }
 
-  static abstract class RandomReader {
+  abstract static class RandomReader {
     private final File file;
     private final BlockingQueue<RandomAccessFile> readFileHandles =
         new ArrayBlockingQueue<RandomAccessFile>(50, true);
     private final KeyProvider encryptionKeyProvider;
     private final boolean fsyncPerTransaction;
     private volatile boolean open;
+
     public RandomReader(File file, @Nullable KeyProvider
-      encryptionKeyProvider, boolean fsyncPerTransaction)
+        encryptionKeyProvider, boolean fsyncPerTransaction)
         throws IOException {
       this.file = file;
       this.encryptionKeyProvider = encryptionKeyProvider;
@@ -466,31 +482,31 @@ public abstract class LogFile {
     }
 
     FlumeEvent get(int offset) throws IOException, InterruptedException,
-      CorruptEventException, NoopRecordException {
+        CorruptEventException, NoopRecordException {
       Preconditions.checkState(open, "File closed");
       RandomAccessFile fileHandle = checkOut();
       boolean error = true;
       try {
         fileHandle.seek(offset);
         byte operation = fileHandle.readByte();
-        if(operation == OP_NOOP) {
+        if (operation == OP_NOOP) {
           throw new NoopRecordException("No op record found. Corrupt record " +
-            "may have been repaired by File Channel Integrity tool");
+              "may have been repaired by File Channel Integrity tool");
         }
         if (operation != OP_RECORD) {
           throw new CorruptEventException(
-            "Operation code is invalid. File " +
-              "is corrupt. Please run File Channel Integrity tool.");
+              "Operation code is invalid. File " +
+                  "is corrupt. Please run File Channel Integrity tool.");
         }
         TransactionEventRecord record = doGet(fileHandle);
-        if(!(record instanceof Put)) {
+        if (!(record instanceof Put)) {
           Preconditions.checkState(false, "Record is " +
               record.getClass().getSimpleName());
         }
         error = false;
-        return ((Put)record).getEvent();
+        return ((Put) record).getEvent();
       } finally {
-        if(error) {
+        if (error) {
           close(fileHandle, file);
         } else {
           checkIn(fileHandle);
@@ -499,12 +515,12 @@ public abstract class LogFile {
     }
 
     synchronized void close() {
-      if(open) {
+      if (open) {
         open = false;
         LOG.info("Closing RandomReader " + file);
         List<RandomAccessFile> fileHandles = Lists.newArrayList();
-        while(readFileHandles.drainTo(fileHandles) > 0) {
-          for(RandomAccessFile fileHandle : fileHandles) {
+        while (readFileHandles.drainTo(fileHandles) > 0) {
+          for (RandomAccessFile fileHandle : fileHandles) {
             synchronized (fileHandle) {
               try {
                 fileHandle.close();
@@ -528,7 +544,7 @@ public abstract class LogFile {
     }
 
     private void checkIn(RandomAccessFile fileHandle) {
-      if(!readFileHandles.offer(fileHandle)) {
+      if (!readFileHandles.offer(fileHandle)) {
         close(fileHandle, file);
       }
     }
@@ -536,19 +552,20 @@ public abstract class LogFile {
     private RandomAccessFile checkOut()
         throws IOException, InterruptedException {
       RandomAccessFile fileHandle = readFileHandles.poll();
-      if(fileHandle != null) {
+      if (fileHandle != null) {
         return fileHandle;
       }
       int remaining = readFileHandles.remainingCapacity();
-      if(remaining > 0) {
+      if (remaining > 0) {
         LOG.info("Opening " + file + " for read, remaining number of file " +
-          "handles available for reads of this file is " + remaining);
+            "handles available for reads of this file is " + remaining);
         return open();
       }
       return readFileHandles.take();
     }
+
     private static void close(RandomAccessFile fileHandle, File file) {
-      if(fileHandle != null) {
+      if (fileHandle != null) {
         try {
           fileHandle.close();
         } catch (IOException e) {
@@ -558,7 +575,7 @@ public abstract class LogFile {
     }
   }
 
-  public static abstract class SequentialReader {
+  public abstract static class SequentialReader {
 
     private final RandomAccessFile fileHandle;
     private final FileChannel fileChannel;
@@ -573,8 +590,9 @@ public abstract class LogFile {
 
     /**
      * Construct a Sequential Log Reader object
+     *
      * @param file
-     * @throws IOException if an I/O error occurs
+     * @throws IOException  if an I/O error occurs
      * @throws EOFException if the file is empty
      */
     SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider)
@@ -584,6 +602,7 @@ public abstract class LogFile {
       fileHandle = new RandomAccessFile(file, "r");
       fileChannel = fileHandle.getChannel();
     }
+
     abstract LogRecord doNext(int offset) throws IOException, CorruptEventException;
 
     abstract int getVersion();
@@ -591,50 +610,57 @@ public abstract class LogFile {
     protected void setLastCheckpointPosition(long lastCheckpointPosition) {
       this.lastCheckpointPosition = lastCheckpointPosition;
     }
+
     protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
       this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
     }
+
     protected void setPreviousCheckpointPosition(
-      long backupCheckpointPosition) {
+        long backupCheckpointPosition) {
       this.backupCheckpointPosition = backupCheckpointPosition;
     }
+
     protected void setPreviousCheckpointWriteOrderID(
-      long backupCheckpointWriteOrderID) {
+        long backupCheckpointWriteOrderID) {
       this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID;
     }
+
     protected void setLogFileID(int logFileID) {
       this.logFileID = logFileID;
       Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
           + Integer.toHexString(logFileID));
 
     }
+
     protected KeyProvider getKeyProvider() {
       return encryptionKeyProvider;
     }
+
     protected RandomAccessFile getFileHandle() {
       return fileHandle;
     }
+
     int getLogFileID() {
       return logFileID;
     }
 
     void skipToLastCheckpointPosition(long checkpointWriteOrderID)
-      throws IOException {
+        throws IOException {
       if (lastCheckpointPosition > 0L) {
         long position = 0;
         if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
           position = lastCheckpointPosition;
         } else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID
-          && backupCheckpointPosition > 0) {
+            && backupCheckpointPosition > 0) {
           position = backupCheckpointPosition;
         }
         fileChannel.position(position);
         LOG.info("fast-forward to checkpoint position: " + position);
       } else {
         LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") "
-          + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
-          + "requested checkpoint time: " + checkpointWriteOrderID
-          + " and position " + lastCheckpointPosition);
+            + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+            + "requested checkpoint time: " + checkpointWriteOrderID
+            + " and position " + lastCheckpointPosition);
       }
     }
 
@@ -644,8 +670,8 @@ public abstract class LogFile {
         long position = fileChannel.position();
         if (position > FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) {
           LOG.info("File position exceeds the threshold: "
-                + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
-                + ", position: " + position);
+              + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
+              + ", position: " + position);
         }
         offset = (int) position;
         Preconditions.checkState(offset >= 0);
@@ -658,21 +684,21 @@ public abstract class LogFile {
             return null;
           } else if (operation == OP_NOOP) {
             LOG.info("No op event found in file: " + file.toString() +
-              " at " + offset + ". Skipping event.");
+                " at " + offset + ". Skipping event.");
             skipRecord(fileHandle, offset + 1);
             offset = (int) fileHandle.getFilePointer();
             continue;
           } else {
             LOG.error("Encountered non op-record at " + offset + " " +
-              Integer.toHexString(operation) + " in " + file);
+                Integer.toHexString(operation) + " in " + file);
             return null;
           }
         }
-        if(offset >= fileHandle.length()) {
+        if (offset >= fileHandle.length()) {
           return null;
         }
         return doNext(offset);
-      } catch(EOFException e) {
+      } catch (EOFException e) {
         return null;
       } catch (IOException e) {
         throw new IOException("Unable to read next Transaction from log file " +
@@ -683,33 +709,36 @@ public abstract class LogFile {
     public long getPosition() throws IOException {
       return fileChannel.position();
     }
+
     public void close() {
-      if(fileHandle != null) {
+      if (fileHandle != null) {
         try {
           fileHandle.close();
-        } catch (IOException e) {}
+        } catch (IOException e) {
+        }
       }
     }
   }
+
   protected static void writeDelimitedBuffer(ByteBuffer output, ByteBuffer buffer)
       throws IOException {
     output.putInt(buffer.limit());
     output.put(buffer);
   }
+
   protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle)
       throws IOException, CorruptEventException {
     int length = fileHandle.readInt();
     if (length < 0) {
-      throw new CorruptEventException("Length of event is: " + String.valueOf
-        (length) + ". Event must have length >= 0. Possible corruption of " +
-        "data or partial fsync.");
+      throw new CorruptEventException("Length of event is: " + String.valueOf(length) +
+          ". Event must have length >= 0. Possible corruption of data or partial fsync.");
     }
     byte[] buffer = new byte[length];
     try {
       fileHandle.readFully(buffer);
     } catch (EOFException ex) {
       throw new CorruptEventException("Remaining data in file less than " +
-        "expected size of event.", ex);
+                                      "expected size of event.", ex);
     }
     return buffer;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 7d7fd85..f2fcad6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -18,24 +18,23 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.security.Key;
 
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
 @SuppressWarnings("deprecation")
 class LogFileFactory {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(LogFileFactory.class);
+
   private LogFileFactory() {}
 
   static LogFile.MetaDataWriter getMetaDataWriter(File file, int logFileID)
@@ -43,21 +42,21 @@ class LogFileFactory {
     RandomAccessFile logFile = null;
     try {
       File metaDataFile = Serialization.getMetaDataFile(file);
-      if(metaDataFile.exists()) {
+      if (metaDataFile.exists()) {
         return new LogFileV3.MetaDataWriter(file, logFileID);
       }
       logFile = new RandomAccessFile(file, "r");
       int version = logFile.readInt();
-      if(Serialization.VERSION_2 == version) {
+      if (Serialization.VERSION_2 == version) {
         return new LogFileV2.MetaDataWriter(file, logFileID);
       }
       throw new IOException("File " + file + " has bad version " +
           Integer.toHexString(version));
     } finally {
-      if(logFile != null) {
+      if (logFile != null) {
         try {
           logFile.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + file, e);
         }
       }
@@ -65,13 +64,13 @@ class LogFileFactory {
   }
 
   static LogFile.Writer getWriter(File file, int logFileID,
-      long maxFileSize, @Nullable Key encryptionKey,
-      @Nullable String encryptionKeyAlias,
-      @Nullable String encryptionCipherProvider,
-      long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
-      int fsyncInterval) throws IOException {
-    Preconditions.checkState(!file.exists(), "File already exists "  +
-      file.getAbsolutePath());
+                                  long maxFileSize, @Nullable Key encryptionKey,
+                                  @Nullable String encryptionKeyAlias,
+                                  @Nullable String encryptionCipherProvider,
+                                  long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+                                  int fsyncInterval) throws IOException {
+    Preconditions.checkState(!file.exists(), "File already exists " +
+        file.getAbsolutePath());
     Preconditions.checkState(file.createNewFile(), "File could not be created "
         + file.getAbsolutePath());
     return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey,
@@ -80,28 +79,29 @@ class LogFileFactory {
   }
 
   static LogFile.RandomReader getRandomReader(File file,
-      @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
+                                              @Nullable KeyProvider encryptionKeyProvider,
+                                              boolean fsyncPerTransaction)
       throws IOException {
     RandomAccessFile logFile = new RandomAccessFile(file, "r");
     try {
       File metaDataFile = Serialization.getMetaDataFile(file);
       // either this is a rr for a just created file or
       // the metadata file exists and as such it's V3
-      if(logFile.length() == 0L || metaDataFile.exists()) {
+      if (logFile.length() == 0L || metaDataFile.exists()) {
         return new LogFileV3.RandomReader(file, encryptionKeyProvider,
-          fsyncPerTransaction);
+            fsyncPerTransaction);
       }
       int version = logFile.readInt();
-      if(Serialization.VERSION_2 == version) {
+      if (Serialization.VERSION_2 == version) {
         return new LogFileV2.RandomReader(file);
       }
       throw new IOException("File " + file + " has bad version " +
           Integer.toHexString(version));
     } finally {
-      if(logFile != null) {
+      if (logFile != null) {
         try {
           logFile.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + file, e);
         }
       }
@@ -109,7 +109,8 @@ class LogFileFactory {
   }
 
   static LogFile.SequentialReader getSequentialReader(File file,
-      @Nullable KeyProvider encryptionKeyProvider, boolean fsyncPerTransaction)
+                                                      @Nullable KeyProvider encryptionKeyProvider,
+                                                      boolean fsyncPerTransaction)
       throws IOException {
     RandomAccessFile logFile = null;
     try {
@@ -134,27 +135,27 @@ class LogFileFactory {
           hasMeta = true;
         } else {
           throw new IOException("Renaming of " + tempMetadataFile.getName()
-                  + " to " + metaDataFile.getName() + " failed");
+              + " to " + metaDataFile.getName() + " failed");
         }
       } else if (oldMetadataFile.exists()) {
         if (oldMetadataFile.renameTo(metaDataFile)) {
           hasMeta = true;
         } else {
           throw new IOException("Renaming of " + oldMetadataFile.getName()
-                  + " to " + metaDataFile.getName() + " failed");
+              + " to " + metaDataFile.getName() + " failed");
         }
       }
       if (hasMeta) {
         // Now the metadata file has been found, delete old or temp files
         // so it does not interfere with normal operation.
-        if(oldMetadataFile.exists()) {
+        if (oldMetadataFile.exists()) {
           oldMetadataFile.delete();
         }
-        if(tempMetadataFile.exists()) {
+        if (tempMetadataFile.exists()) {
           tempMetadataFile.delete();
         }
-        if(metaDataFile.length() == 0L) {
-          if(file.length() != 0L) {
+        if (metaDataFile.length() == 0L) {
+          if (file.length() != 0L) {
             String msg = String.format("MetaData file %s is empty, but log %s" +
                 " is of size %d", metaDataFile, file, file.length());
             throw new IllegalStateException(msg);
@@ -163,20 +164,20 @@ class LogFileFactory {
               metaDataFile));
         }
         return new LogFileV3.SequentialReader(file, encryptionKeyProvider,
-          fsyncPerTransaction);
+            fsyncPerTransaction);
       }
       logFile = new RandomAccessFile(file, "r");
       int version = logFile.readInt();
-      if(Serialization.VERSION_2 == version) {
+      if (Serialization.VERSION_2 == version) {
         return new LogFileV2.SequentialReader(file);
       }
       throw new IOException("File " + file + " has bad version " +
           Integer.toHexString(version));
     } finally {
-      if(logFile != null) {
+      if (logFile != null) {
         try {
           logFile.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + file, e);
         }
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
index 9447652..b0377ab 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java
@@ -22,12 +22,15 @@ import java.io.IOException;
 
 public class LogFileRetryableIOException extends IOException {
   private static final long serialVersionUID = -2747112999806160431L;
+
   public LogFileRetryableIOException() {
     super();
   }
+
   public LogFileRetryableIOException(String msg) {
     super(msg);
   }
+
   public LogFileRetryableIOException(String msg, Throwable t) {
     super(msg, t);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
index bb25e95..62b8cb9 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -65,7 +65,7 @@ class LogFileV2 extends LogFile {
             + ", logWriteOrderID: " + getLastCheckpointWriteOrderID());
         error = false;
       } finally {
-        if(error) {
+        if (error) {
           close();
         }
       }
@@ -108,6 +108,7 @@ class LogFileV2 extends LogFile {
       getFileChannel().force(true);
 
     }
+
     @Override
     int getVersion() {
       return Serialization.VERSION_2;
@@ -115,29 +116,27 @@ class LogFileV2 extends LogFile {
   }
 
   static class RandomReader extends LogFile.RandomReader {
-    RandomReader(File file)
-        throws IOException {
+    RandomReader(File file) throws IOException {
       super(file, null, true);
     }
+
     @Override
     int getVersion() {
       return Serialization.VERSION_2;
     }
+
     @Override
-    protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
-        throws IOException {
+    protected TransactionEventRecord doGet(RandomAccessFile fileHandle) throws IOException {
       return TransactionEventRecord.fromDataInputV2(fileHandle);
     }
   }
 
   static class SequentialReader extends LogFile.SequentialReader {
-
-    SequentialReader(File file)
-        throws EOFException, IOException {
+    SequentialReader(File file) throws EOFException, IOException {
       super(file, null);
       RandomAccessFile fileHandle = getFileHandle();
       int version = fileHandle.readInt();
-      if(version != getVersion()) {
+      if (version != getVersion()) {
         throw new IOException("Version is " + Integer.toHexString(version) +
             " expected " + Integer.toHexString(getVersion())
             + " file: " + file.getCanonicalPath());
@@ -146,10 +145,12 @@ class LogFileV2 extends LogFile {
       setLastCheckpointPosition(fileHandle.readLong());
       setLastCheckpointWriteOrderID(fileHandle.readLong());
     }
+
     @Override
     public int getVersion() {
       return Serialization.VERSION_2;
     }
+
     @Override
     LogRecord doNext(int offset) throws IOException {
       TransactionEventRecord event =

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index 9b0ef93..b459947 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -21,7 +21,6 @@ package org.apache.flume.channel.file;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.GeneratedMessage;
-import org.apache.flume.Transaction;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.CipherProvider;
@@ -58,13 +57,14 @@ public class LogFileV3 extends LogFile {
   static class MetaDataWriter extends LogFile.MetaDataWriter {
     private ProtosFactory.LogFileMetaData logFileMetaData;
     private final File metaDataFile;
+
     protected MetaDataWriter(File logFile, int logFileID) throws IOException {
       super(logFile, logFileID);
       metaDataFile = Serialization.getMetaDataFile(logFile);
       MetaDataReader metaDataReader = new MetaDataReader(logFile, logFileID);
       logFileMetaData = metaDataReader.read();
       int version = logFileMetaData.getVersion();
-      if(version != getVersion()) {
+      if (version != getVersion()) {
         throw new IOException("Version is " + Integer.toHexString(version) +
             " expected " + Integer.toHexString(getVersion())
             + " file: " + logFile);
@@ -90,9 +90,9 @@ public class LogFileV3 extends LogFile {
        * would be possible to recover from a backup.
        */
       metaDataBuilder.setBackupCheckpointPosition(logFileMetaData
-        .getCheckpointPosition());
+          .getCheckpointPosition());
       metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData
-        .getCheckpointWriteOrderID());
+          .getCheckpointWriteOrderID());
       logFileMetaData = metaDataBuilder.build();
       writeDelimitedTo(logFileMetaData, metaDataFile);
     }
@@ -102,17 +102,19 @@ public class LogFileV3 extends LogFile {
     private final File logFile;
     private final File metaDataFile;
     private final int logFileID;
+
     protected MetaDataReader(File logFile, int logFileID) throws IOException {
       this.logFile = logFile;
       metaDataFile = Serialization.getMetaDataFile(logFile);
       this.logFileID = logFileID;
     }
+
     ProtosFactory.LogFileMetaData read() throws IOException {
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
         ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
-          ProtosFactory.LogFileMetaData.
-            parseDelimitedFrom(inputStream), "Metadata cannot be null");
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
+            "Metadata cannot be null");
         if (metaData.getLogFileID() != logFileID) {
           throw new IOException("The file id of log file: "
               + logFile + " is different from expected "
@@ -123,7 +125,7 @@ public class LogFileV3 extends LogFile {
       } finally {
         try {
           inputStream.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + metaDataFile, e);
         }
       }
@@ -133,13 +135,14 @@ public class LogFileV3 extends LogFile {
   /**
    * Writes a GeneratedMessage to a temp file, synchronizes it to disk
    * and then renames the file over file.
-   * @param msg GeneratedMessage to write to the file
+   *
+   * @param msg  GeneratedMessage to write to the file
    * @param file destination file
    * @throws IOException if a write error occurs or the File.renameTo
-   * method returns false meaning the file could not be overwritten.
+   *                     method returns false meaning the file could not be overwritten.
    */
   public static void writeDelimitedTo(GeneratedMessage msg, File file)
-  throws IOException {
+      throws IOException {
     File tmp = Serialization.getMetaDataTempFile(file);
     FileOutputStream outputStream = new FileOutputStream(tmp);
     boolean closed = false;
@@ -148,26 +151,26 @@ public class LogFileV3 extends LogFile {
       outputStream.getChannel().force(true);
       outputStream.close();
       closed = true;
-      if(!tmp.renameTo(file)) {
+      if (!tmp.renameTo(file)) {
         //Some platforms don't support moving over an existing file.
         //So:
         //log.meta -> log.meta.old
         //log.meta.tmp -> log.meta
         //delete log.meta.old
         File oldFile = Serialization.getOldMetaDataFile(file);
-        if(!file.renameTo(oldFile)){
+        if (!file.renameTo(oldFile)) {
           throw new IOException("Unable to rename " + file + " to " + oldFile);
         }
-        if(!tmp.renameTo(file)) {
+        if (!tmp.renameTo(file)) {
           throw new IOException("Unable to rename " + tmp + " over " + file);
         }
         oldFile.delete();
       }
     } finally {
-      if(!closed) {
+      if (!closed) {
         try {
           outputStream.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + tmp, e);
         }
       }
@@ -177,17 +180,17 @@ public class LogFileV3 extends LogFile {
 
   static class Writer extends LogFile.Writer {
     Writer(File file, int logFileID, long maxFileSize,
-        @Nullable Key encryptionKey,
-        @Nullable String encryptionKeyAlias,
-        @Nullable String encryptionCipherProvider,
-        long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
-        int fsyncInterval) throws IOException {
-      super(file, logFileID, maxFileSize, CipherProviderFactory.
-          getEncrypter(encryptionCipherProvider, encryptionKey),
-          usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
+           @Nullable Key encryptionKey,
+           @Nullable String encryptionKeyAlias,
+           @Nullable String encryptionCipherProvider,
+           long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
+           int fsyncInterval) throws IOException {
+      super(file, logFileID, maxFileSize,
+            CipherProviderFactory.getEncrypter(encryptionCipherProvider, encryptionKey),
+            usableSpaceRefreshInterval, fsyncPerTransaction, fsyncInterval);
       ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
           ProtosFactory.LogFileMetaData.newBuilder();
-      if(encryptionKey != null) {
+      if (encryptionKey != null) {
         Preconditions.checkNotNull(encryptionKeyAlias, "encryptionKeyAlias");
         Preconditions.checkNotNull(encryptionCipherProvider,
             "encryptionCipherProvider");
@@ -208,6 +211,7 @@ public class LogFileV3 extends LogFile {
       File metaDataFile = Serialization.getMetaDataFile(file);
       writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
     }
+
     @Override
     int getVersion() {
       return Serialization.VERSION_3;
@@ -221,28 +225,29 @@ public class LogFileV3 extends LogFile {
     private volatile String cipherProvider;
     private volatile byte[] parameters;
     private BlockingQueue<CipherProvider.Decryptor> decryptors =
-      new LinkedBlockingDeque<CipherProvider.Decryptor>();
+        new LinkedBlockingDeque<CipherProvider.Decryptor>();
 
     RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider,
-      boolean fsyncPerTransaction) throws IOException {
+                 boolean fsyncPerTransaction) throws IOException {
       super(file, encryptionKeyProvider, fsyncPerTransaction);
     }
+
     private void initialize() throws IOException {
       File metaDataFile = Serialization.getMetaDataFile(getFile());
       FileInputStream inputStream = new FileInputStream(metaDataFile);
       try {
-        ProtosFactory.LogFileMetaData metaData =
-            Preconditions.checkNotNull(ProtosFactory.LogFileMetaData.
-                parseDelimitedFrom(inputStream), "MetaData cannot be null");
+        ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
+            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
+            "MetaData cannot be null");
         int version = metaData.getVersion();
-        if(version != getVersion()) {
+        if (version != getVersion()) {
           throw new IOException("Version is " + Integer.toHexString(version) +
               " expected " + Integer.toHexString(getVersion())
               + " file: " + getFile().getCanonicalPath());
         }
         encryptionEnabled = false;
-        if(metaData.hasEncryption()) {
-          if(getKeyProvider() == null) {
+        if (metaData.hasEncryption()) {
+          if (getKeyProvider() == null) {
             throw new IllegalStateException("Data file is encrypted but no " +
                 " provider was specified");
           }
@@ -255,23 +260,26 @@ public class LogFileV3 extends LogFile {
       } finally {
         try {
           inputStream.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + metaDataFile, e);
         }
       }
     }
+
     private CipherProvider.Decryptor getDecryptor() {
       CipherProvider.Decryptor decryptor = decryptors.poll();
-      if(decryptor == null) {
+      if (decryptor == null) {
         decryptor = CipherProviderFactory.getDecrypter(cipherProvider, key,
             parameters);
       }
       return decryptor;
     }
+
     @Override
     int getVersion() {
       return Serialization.VERSION_3;
     }
+
     @Override
     protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
         throws IOException, CorruptEventException {
@@ -279,7 +287,7 @@ public class LogFileV3 extends LogFile {
       // empty. As such we wait to initialize until there is some
       // data before we we initialize
       synchronized (this) {
-        if(!initialized) {
+        if (!initialized) {
           initialized = true;
           initialize();
         }
@@ -288,18 +296,17 @@ public class LogFileV3 extends LogFile {
       CipherProvider.Decryptor decryptor = null;
       try {
         byte[] buffer = readDelimitedBuffer(fileHandle);
-        if(encryptionEnabled) {
+        if (encryptionEnabled) {
           decryptor = getDecryptor();
           buffer = decryptor.decrypt(buffer);
         }
-        TransactionEventRecord event = TransactionEventRecord.
-            fromByteArray(buffer);
+        TransactionEventRecord event = TransactionEventRecord.fromByteArray(buffer);
         success = true;
         return event;
-      } catch(DecryptionFailureException ex) {
+      } catch (DecryptionFailureException ex) {
         throw new CorruptEventException("Error decrypting event", ex);
       } finally {
-        if(success && encryptionEnabled && decryptor != null) {
+        if (success && encryptionEnabled && decryptor != null) {
           decryptors.offer(decryptor);
         }
       }
@@ -309,9 +316,10 @@ public class LogFileV3 extends LogFile {
   public static class SequentialReader extends LogFile.SequentialReader {
     private CipherProvider.Decryptor decryptor;
     private final boolean fsyncPerTransaction;
+
     public SequentialReader(File file, @Nullable KeyProvider
-      encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException,
-      IOException {
+        encryptionKeyProvider, boolean fsyncPerTransaction) throws EOFException,
+        IOException {
       super(file, encryptionKeyProvider);
       this.fsyncPerTransaction = fsyncPerTransaction;
       File metaDataFile = Serialization.getMetaDataFile(file);
@@ -321,32 +329,31 @@ public class LogFileV3 extends LogFile {
             ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
             "MetaData cannot be null");
         int version = metaData.getVersion();
-        if(version != getVersion()) {
+        if (version != getVersion()) {
           throw new IOException("Version is " + Integer.toHexString(version) +
               " expected " + Integer.toHexString(getVersion())
               + " file: " + file.getCanonicalPath());
         }
-        if(metaData.hasEncryption()) {
-          if(getKeyProvider() == null) {
+        if (metaData.hasEncryption()) {
+          if (getKeyProvider() == null) {
             throw new IllegalStateException("Data file is encrypted but no " +
                 " provider was specified");
           }
           ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
           Key key = getKeyProvider().getKey(encryption.getKeyAlias());
-          decryptor = CipherProviderFactory.
-              getDecrypter(encryption.getCipherProvider(), key,
-                  encryption.getParameters().toByteArray());
+          decryptor = CipherProviderFactory.getDecrypter(
+              encryption.getCipherProvider(), key, encryption.getParameters().toByteArray());
         }
         setLogFileID(metaData.getLogFileID());
         setLastCheckpointPosition(metaData.getCheckpointPosition());
         setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
         setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition());
         setPreviousCheckpointWriteOrderID(
-          metaData.getBackupCheckpointWriteOrderID());
+            metaData.getBackupCheckpointWriteOrderID());
       } finally {
         try {
           inputStream.close();
-        } catch(IOException e) {
+        } catch (IOException e) {
           LOGGER.warn("Unable to close " + metaDataFile, e);
         }
       }
@@ -359,7 +366,7 @@ public class LogFileV3 extends LogFile {
 
     @Override
     LogRecord doNext(int offset) throws IOException, CorruptEventException,
-      DecryptionFailureException {
+        DecryptionFailureException {
       byte[] buffer = null;
       TransactionEventRecord event = null;
       try {
@@ -370,7 +377,7 @@ public class LogFileV3 extends LogFile {
         event = TransactionEventRecord.fromByteArray(buffer);
       } catch (CorruptEventException ex) {
         LOGGER.warn("Corrupt file found. File id: log-" + this.getLogFileID(),
-          ex);
+            ex);
         // Return null so that replay handler thinks all events in this file
         // have been taken.
         if (!fsyncPerTransaction) {
@@ -380,7 +387,7 @@ public class LogFileV3 extends LogFile {
       } catch (DecryptionFailureException ex) {
         if (!fsyncPerTransaction) {
           LOGGER.warn("Could not decrypt even read from channel. Skipping " +
-            "event.", ex);
+              "event.", ex);
           return null;
         }
         throw ex;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
index 19ad0d6..5a75627 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogRecord.java
@@ -20,15 +20,17 @@ package org.apache.flume.channel.file;
 
 import java.util.Arrays;
 
-
 public class LogRecord implements Comparable<LogRecord> {
-  private int fileID, offset;
+  private int fileID;
+  private int offset;
   private TransactionEventRecord event;
+
   public LogRecord(int fileID, int offset, TransactionEventRecord event) {
     this.fileID = fileID;
     this.offset = offset;
     this.event = event;
   }
+
   public int getFileID() {
     return fileID;
   }
@@ -41,20 +43,16 @@ public class LogRecord implements Comparable<LogRecord> {
 
   @Override
   public int compareTo(LogRecord o) {
-    int result = new Long(event.getLogWriteOrderID())
-      .compareTo(o.getEvent().getLogWriteOrderID());
-    if(result == 0) {
+    int result = new Long(event.getLogWriteOrderID()).compareTo(o.getEvent().getLogWriteOrderID());
+    if (result == 0) {
       // oops we have hit a flume-1.2 bug. let's try and use the txid
       // to replay the events
-      result = new Long(event.getTransactionID())
-        .compareTo(o.getEvent().getTransactionID());
-      if(result == 0) {
+      result = new Long(event.getTransactionID()).compareTo(o.getEvent().getTransactionID());
+      if (result == 0) {
         // events are within the same transaction. Basically we want commit
         // and rollback to come after take and put
-        Integer thisIndex = Arrays.binarySearch(replaySortOrder,
-            event.getRecordType());
-        Integer thatIndex = Arrays.binarySearch(replaySortOrder,
-            o.getEvent().getRecordType());
+        Integer thisIndex = Arrays.binarySearch(replaySortOrder, event.getRecordType());
+        Integer thatIndex = Arrays.binarySearch(replaySortOrder, o.getEvent().getRecordType());
         return thisIndex.compareTo(thatIndex);
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
index d1498c2..48177d0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
@@ -64,7 +64,7 @@ public class LogUtils {
   static List<File> getLogs(File logDir) {
     List<File> result = Lists.newArrayList();
     File[] files = logDir.listFiles();
-    if(files == null) {
+    if (files == null) {
       String msg = logDir + ".listFiles() returned null: ";
       msg += "File = " + logDir.isFile() + ", ";
       msg += "Exists = " + logDir.exists() + ", ";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
index dfcdd73..b74ff7b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Pair.java
@@ -19,19 +19,21 @@
 package org.apache.flume.channel.file;
 
 class Pair<L,R> {
-
   private final L left;
   private final R right;
+
   Pair(L l, R r) {
     left = l;
     right = r;
   }
+
   L getLeft() {
     return left;
   }
   R getRight() {
     return right;
   }
+
   static <L, R> Pair<L, R> of(L left, R right) {
     return new Pair<L, R>(left, right);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index f08f024..0a70a24 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -78,8 +78,8 @@ class Put extends TransactionEventRecord {
     Map<String, String> headers = event.getHeaders();
     ProtosFactory.FlumeEventHeader.Builder headerBuilder =
         ProtosFactory.FlumeEventHeader.newBuilder();
-    if(headers != null) {
-      for(String key : headers.keySet()) {
+    if (headers != null) {
+      for (String key : headers.keySet()) {
         String value = headers.get(key);
         headerBuilder.clear();
         eventBuilder.addHeaders(headerBuilder.setKey(key)
@@ -93,13 +93,12 @@ class Put extends TransactionEventRecord {
     putBuilder.build().writeDelimitedTo(out);
   }
   @Override
-  void readProtos(InputStream in) throws IOException,
-    CorruptEventException {
-    ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory.
-        Put.parseDelimitedFrom(in), "Put cannot be null");
+  void readProtos(InputStream in) throws IOException, CorruptEventException {
+    ProtosFactory.Put put = Preconditions.checkNotNull(
+        ProtosFactory.Put.parseDelimitedFrom(in), "Put cannot be null");
     Map<String, String> headers = Maps.newHashMap();
     ProtosFactory.FlumeEvent protosEvent = put.getEvent();
-    for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
+    for (ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) {
       headers.put(header.getKey(), header.getValue());
     }
     byte[] eventBody = protosEvent.getBody().toByteArray();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index a559503..662fd42 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -99,8 +99,8 @@ class ReplayHandler {
   }
 
   ReplayHandler(FlumeEventQueue queue,
-    @Nullable KeyProvider encryptionKeyProvider,
-    boolean fsyncPerTransaction) {
+                @Nullable KeyProvider encryptionKeyProvider,
+                boolean fsyncPerTransaction) {
     this.queue = queue;
     this.lastCheckpoint = queue.getLogWriteOrderID();
     pendingTakes = Lists.newArrayList();
@@ -109,6 +109,7 @@ class ReplayHandler {
     this.encryptionKeyProvider = encryptionKeyProvider;
     this.fsyncPerTransaction = fsyncPerTransaction;
   }
+
   /**
    * Replay logic from Flume1.2 which can be activated if the v2 logic
    * is failing on ol logs for some reason.
@@ -165,9 +166,8 @@ class ReplayHandler {
               commitCount++;
               @SuppressWarnings("unchecked")
               Collection<FlumeEventPointer> pointers =
-                (Collection<FlumeEventPointer>) transactionMap.remove(trans);
-              if (((Commit) record).getType()
-                      == TransactionEventRecord.Type.TAKE.get()) {
+                  (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+              if (((Commit) record).getType() == TransactionEventRecord.Type.TAKE.get()) {
                 if (inflightTakes.containsKey(trans)) {
                   if (pointers == null) {
                     pointers = Sets.newHashSet();
@@ -185,8 +185,8 @@ class ReplayHandler {
                 count += pointers.size();
               }
             } else {
-              Preconditions.checkArgument(false, "Unknown record type: "
-                + Integer.toHexString(type));
+              Preconditions.checkArgument(false,
+                                          "Unknown record type: " + Integer.toHexString(type));
             }
 
           } else {
@@ -196,8 +196,8 @@ class ReplayHandler {
         LOG.info("Replayed " + count + " from " + log);
         if (LOG.isDebugEnabled()) {
           LOG.debug("read: " + readCount + ", put: " + putCount + ", take: "
-            + takeCount + ", rollback: " + rollbackCount + ", commit: "
-            + commitCount + ", skipp: " + skipCount);
+              + takeCount + ", rollback: " + rollbackCount + ", commit: "
+              + commitCount + ", skipp: " + skipCount);
         }
       } catch (EOFException e) {
         LOG.warn("Hit EOF on " + log);
@@ -262,21 +262,20 @@ class ReplayHandler {
         LOG.info("Replaying " + log);
         try {
           LogFile.SequentialReader reader =
-            LogFileFactory.getSequentialReader(log, encryptionKeyProvider,
-              fsyncPerTransaction);
+              LogFileFactory.getSequentialReader(log, encryptionKeyProvider, fsyncPerTransaction);
           reader.skipToLastCheckpointPosition(queue.getLogWriteOrderID());
           Preconditions.checkState(!readers.containsKey(reader.getLogFileID()),
               "Readers " + readers + " already contains "
                   + reader.getLogFileID());
           readers.put(reader.getLogFileID(), reader);
           LogRecord logRecord = reader.next();
-          if(logRecord == null) {
+          if (logRecord == null) {
             readers.remove(reader.getLogFileID());
             reader.close();
           } else {
             logRecordBuffer.add(logRecord);
           }
-        } catch(EOFException e) {
+        } catch (EOFException e) {
           LOG.warn("Ignoring " + log + " due to EOF", e);
         }
       }
@@ -294,7 +293,7 @@ class ReplayHandler {
         writeOrderIDSeed = Math.max(writeOrderIDSeed,
             record.getLogWriteOrderID());
         readCount++;
-        if(readCount % 10000 == 0 && readCount > 0) {
+        if (readCount % 10000 == 0 && readCount > 0) {
           LOG.info("read: " + readCount + ", put: " + putCount + ", take: "
               + takeCount + ", rollback: " + rollbackCount + ", commit: "
               + commitCount + ", skip: " + skipCount + ", eventCount:" + count);
@@ -316,11 +315,11 @@ class ReplayHandler {
             commitCount++;
             @SuppressWarnings("unchecked")
             Collection<FlumeEventPointer> pointers =
-              (Collection<FlumeEventPointer>) transactionMap.remove(trans);
+                (Collection<FlumeEventPointer>) transactionMap.remove(trans);
             if (((Commit) record).getType()
                     == TransactionEventRecord.Type.TAKE.get()) {
               if (inflightTakes.containsKey(trans)) {
-                if(pointers == null){
+                if (pointers == null) {
                   pointers = Sets.newHashSet();
                 }
                 Set<Long> takes = inflightTakes.removeAll(trans);
@@ -350,8 +349,8 @@ class ReplayHandler {
     } finally {
       TransactionIDOracle.setSeed(transactionIDSeed);
       WriteOrderOracle.setSeed(writeOrderIDSeed);
-      for(LogFile.SequentialReader reader : readers.values()) {
-        if(reader != null) {
+      for (LogFile.SequentialReader reader : readers.values()) {
+        if (reader != null) {
           reader.close();
         }
       }
@@ -378,11 +377,11 @@ class ReplayHandler {
   }
   private LogRecord next() throws IOException, CorruptEventException {
     LogRecord resultLogRecord = logRecordBuffer.poll();
-    if(resultLogRecord != null) {
+    if (resultLogRecord != null) {
       // there is more log records to read
       LogFile.SequentialReader reader = readers.get(resultLogRecord.getFileID());
       LogRecord nextLogRecord;
-      if((nextLogRecord = reader.next()) != null) {
+      if ((nextLogRecord = reader.next()) != null) {
         logRecordBuffer.add(nextLogRecord);
       }
     }
@@ -391,7 +390,7 @@ class ReplayHandler {
   private void processCommit(short type, Collection<FlumeEventPointer> pointers) {
     if (type == TransactionEventRecord.Type.PUT.get()) {
       for (FlumeEventPointer pointer : pointers) {
-        if(!queue.addTail(pointer)) {
+        if (!queue.addTail(pointer)) {
           throw new IllegalStateException("Unable to add "
               + pointer + ". Queue depth = " + queue.getSize()
               + ", Capacity = " + queue.getCapacity());

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
index 335ad0b..2fca755 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Rollback.java
@@ -35,6 +35,7 @@ class Rollback extends TransactionEventRecord {
   Rollback(Long transactionID, Long logWriteOrderID) {
     super(transactionID, logWriteOrderID);
   }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
@@ -44,22 +45,26 @@ class Rollback extends TransactionEventRecord {
   public void write(DataOutput out) throws IOException {
     super.write(out);
   }
+
   @Override
   void writeProtos(OutputStream out) throws IOException {
     ProtosFactory.Rollback.Builder rollbackBuilder =
         ProtosFactory.Rollback.newBuilder();
     rollbackBuilder.build().writeDelimitedTo(out);
   }
+
   @Override
   void readProtos(InputStream in) throws IOException {
     @SuppressWarnings("unused")
-    ProtosFactory.Rollback rollback = Preconditions.checkNotNull(ProtosFactory.
-        Rollback.parseDelimitedFrom(in), "Rollback cannot be null");
+    ProtosFactory.Rollback rollback = Preconditions.checkNotNull(
+        ProtosFactory.Rollback.parseDelimitedFrom(in), "Rollback cannot be null");
   }
+
   @Override
   short getRecordType() {
     return Type.ROLLBACK.get();
   }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index a6eda75..19303cc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -46,14 +46,12 @@ public class Serialization {
   static final long SIZE_OF_INT = 4;
   static final int SIZE_OF_LONG = 8;
 
-
   static final int VERSION_2 = 2;
   static final int VERSION_3 = 3;
 
   public static final String METADATA_FILENAME = ".meta";
   public static final String METADATA_TMP_FILENAME = ".tmp";
-  public static final String OLD_METADATA_FILENAME = METADATA_FILENAME +
-    ".old";
+  public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
 
   // 64 K buffer to copy and compress files.
   private static final int FILE_BUFFER_SIZE = 64 * 1024;
@@ -63,12 +61,11 @@ public class Serialization {
   static File getMetaDataTempFile(File metaDataFile) {
     String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME;
     return new File(metaDataFile.getParentFile(), metaDataFileName);
-
   }
+
   static File getMetaDataFile(File file) {
     String metaDataFileName = file.getName() + METADATA_FILENAME;
     return new File(file.getParentFile(), metaDataFileName);
-
   }
 
   // Support platforms that cannot do atomic renames - FLUME-1699
@@ -79,19 +76,20 @@ public class Serialization {
 
   /**
    * Deletes all files in given directory.
+   *
    * @param checkpointDir - The directory whose files are to be deleted
-   * @param excludes - Names of files which should not be deleted from this
-   *                 directory.
+   * @param excludes      - Names of files which should not be deleted from this
+   *                      directory.
    * @return - true if all files were successfully deleted, false otherwise.
    */
   static boolean deleteAllFiles(File checkpointDir,
-    @Nullable Set<String> excludes) {
+                                @Nullable Set<String> excludes) {
     if (!checkpointDir.isDirectory()) {
       return false;
     }
 
     File[] files = checkpointDir.listFiles();
-    if(files == null) {
+    if (files == null) {
       return false;
     }
     StringBuilder builder;
@@ -100,13 +98,13 @@ public class Serialization {
     } else {
       builder = new StringBuilder("Deleted the following files: ");
     }
-    if(excludes == null) {
+    if (excludes == null) {
       excludes = Collections.emptySet();
     }
     for (File file : files) {
-      if(excludes.contains(file.getName())) {
+      if (excludes.contains(file.getName())) {
         LOG.info("Skipping " + file.getName() + " because it is in excludes " +
-          "set");
+            "set");
         continue;
       }
       if (!FileUtils.deleteQuietly(file)) {
@@ -125,18 +123,19 @@ public class Serialization {
   /**
    * Copy a file using a 64K size buffer. This method will copy the file and
    * then fsync to disk
+   *
    * @param from File to copy - this file should exist
-   * @param to Destination file - this file should not exist
+   * @param to   Destination file - this file should not exist
    * @return true if the copy was successful
    */
   public static boolean copyFile(File from, File to) throws IOException {
     Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
     Preconditions.checkNotNull(to, "Destination file is null, " +
-      "file copy failed.");
+        "file copy failed.");
     Preconditions.checkState(from.exists(), "Source file: " + from.toString() +
-      " does not exist.");
+        " does not exist.");
     Preconditions.checkState(!to.exists(), "Destination file: "
-      + to.toString() + " unexpectedly exists.");
+        + to.toString() + " unexpectedly exists.");
 
     BufferedInputStream in = null;
     RandomAccessFile out = null; //use a RandomAccessFile for easy fsync
@@ -145,7 +144,7 @@ public class Serialization {
       out = new RandomAccessFile(to, "rw");
       byte[] buf = new byte[FILE_BUFFER_SIZE];
       int total = 0;
-      while(true) {
+      while (true) {
         int read = in.read(buf);
         if (read == -1) {
           break;
@@ -155,11 +154,11 @@ public class Serialization {
       }
       out.getFD().sync();
       Preconditions.checkState(total == from.length(),
-        "The size of the origin file and destination file are not equal.");
+          "The size of the origin file and destination file are not equal.");
       return true;
     } catch (Exception ex) {
       LOG.error("Error while attempting to copy " + from.toString() + " to "
-        + to.toString() + ".", ex);
+          + to.toString() + ".", ex);
       Throwables.propagate(ex);
     } finally {
       Throwable th = null;
@@ -185,26 +184,26 @@ public class Serialization {
     }
     // Should never reach here.
     throw new IOException("Copying file: " + from.toString() + " to: " + to
-      .toString() + " may have failed.");
+        .toString() + " may have failed.");
   }
 
   /**
    * Compress file using Snappy
+   *
    * @param uncompressed File to compress - this file should exist
-   * @param compressed Compressed file - this file should not exist
+   * @param compressed   Compressed file - this file should not exist
    * @return true if compression was successful
    */
   public static boolean compressFile(File uncompressed, File compressed)
-    throws IOException {
+      throws IOException {
     Preconditions.checkNotNull(uncompressed,
-      "Source file is null, compression failed.");
+        "Source file is null, compression failed.");
     Preconditions.checkNotNull(compressed,
-      "Destination file is null, compression failed.");
+        "Destination file is null, compression failed.");
     Preconditions.checkState(uncompressed.exists(), "Source file: " +
-      uncompressed.toString() + " does not exist.");
+        uncompressed.toString() + " does not exist.");
     Preconditions.checkState(!compressed.exists(),
-      "Compressed file: " + compressed.toString() + " unexpectedly " +
-        "exists.");
+        "Compressed file: " + compressed.toString() + " unexpectedly " + "exists.");
 
     BufferedInputStream in = null;
     FileOutputStream out = null;
@@ -215,7 +214,7 @@ public class Serialization {
       snappyOut = new SnappyOutputStream(out);
 
       byte[] buf = new byte[FILE_BUFFER_SIZE];
-      while(true) {
+      while (true) {
         int read = in.read(buf);
         if (read == -1) {
           break;
@@ -226,8 +225,7 @@ public class Serialization {
       return true;
     } catch (Exception ex) {
       LOG.error("Error while attempting to compress " +
-        uncompressed.toString() + " to " + compressed.toString()
-        + ".", ex);
+                uncompressed.toString() + " to " + compressed.toString() + ".", ex);
       Throwables.propagate(ex);
     } finally {
       Throwable th = null;
@@ -253,26 +251,24 @@ public class Serialization {
     }
     // Should never reach here.
     throw new IOException("Copying file: " + uncompressed.toString()
-      + " to: " + compressed.toString() + " may have failed.");
+        + " to: " + compressed.toString() + " may have failed.");
   }
 
   /**
    * Decompress file using Snappy
-   * @param compressed File to compress - this file should exist
+   *
+   * @param compressed   File to compress - this file should exist
    * @param decompressed Compressed file - this file should not exist
    * @return true if decompression was successful
    */
-  public static boolean decompressFile(File compressed, File decompressed)
-    throws IOException {
-    Preconditions.checkNotNull(compressed,
-      "Source file is null, decompression failed.");
+  public static boolean decompressFile(File compressed, File decompressed) throws IOException {
+    Preconditions.checkNotNull(compressed, "Source file is null, decompression failed.");
     Preconditions.checkNotNull(decompressed, "Destination file is " +
-      "null, decompression failed.");
+        "null, decompression failed.");
     Preconditions.checkState(compressed.exists(), "Source file: " +
-      compressed.toString() + " does not exist.");
+        compressed.toString() + " does not exist.");
     Preconditions.checkState(!decompressed.exists(),
-      "Decompressed file: " + decompressed.toString() +
-        " unexpectedly exists.");
+        "Decompressed file: " + decompressed.toString() + " unexpectedly exists.");
 
     BufferedInputStream in = null;
     SnappyInputStream snappyIn = null;
@@ -283,7 +279,7 @@ public class Serialization {
       out = new FileOutputStream(decompressed);
 
       byte[] buf = new byte[FILE_BUFFER_SIZE];
-      while(true) {
+      while (true) {
         int read = snappyIn.read(buf);
         if (read == -1) {
           break;
@@ -294,8 +290,8 @@ public class Serialization {
       return true;
     } catch (Exception ex) {
       LOG.error("Error while attempting to compress " +
-        compressed.toString() + " to " + decompressed.toString() +
-        ".", ex);
+          compressed.toString() + " to " + decompressed.toString() +
+          ".", ex);
       Throwables.propagate(ex);
     } finally {
       Throwable th = null;
@@ -321,7 +317,7 @@ public class Serialization {
     }
     // Should never reach here.
     throw new IOException("Decompressing file: " +
-      compressed.toString() + " to: " + decompressed.toString() +
-      " may have failed.");
+        compressed.toString() + " to: " + decompressed.toString() +
+        " may have failed.");
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
index 143143a..ee7fcc8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Take.java
@@ -34,14 +34,17 @@ import com.google.common.base.Preconditions;
 class Take extends TransactionEventRecord {
   private int offset;
   private int fileID;
+
   Take(Long transactionID, Long logWriteOrderID) {
     super(transactionID, logWriteOrderID);
   }
+
   Take(Long transactionID, Long logWriteOrderID, int offset, int fileID) {
     this(transactionID, logWriteOrderID);
     this.offset = offset;
     this.fileID = fileID;
   }
+
   int getOffset() {
     return offset;
   }
@@ -70,17 +73,20 @@ class Take extends TransactionEventRecord {
     takeBuilder.setOffset(offset);
     takeBuilder.build().writeDelimitedTo(out);
   }
+
   @Override
   void readProtos(InputStream in) throws IOException {
-    ProtosFactory.Take take = Preconditions.checkNotNull(ProtosFactory.
-        Take.parseDelimitedFrom(in), "Take cannot be null");
+    ProtosFactory.Take take = Preconditions.checkNotNull(
+        ProtosFactory.Take.parseDelimitedFrom(in), "Take cannot be null");
     fileID = take.getFileID();
     offset = take.getOffset();
   }
+
   @Override
   short getRecordType() {
     return Type.TAKE.get();
   }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
index 1eb3f4f..0f7c3c8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java
@@ -91,13 +91,16 @@ public abstract class TransactionEventRecord implements Writable {
     COMMIT((short)4);
 
     private short id;
+
     Type(short id) {
       this.id = id;
     }
+
     public short get() {
       return id;
     }
   }
+
   private static final ImmutableMap<Short, Constructor<? extends TransactionEventRecord>> TYPES;
 
   static {
@@ -131,11 +134,11 @@ public abstract class TransactionEventRecord implements Writable {
       dataOutput.flush();
       // TODO toByteArray does an unneeded copy
       return ByteBuffer.wrap(byteOutput.toByteArray());
-    } catch(IOException e) {
+    } catch (IOException e) {
       // near impossible
       throw Throwables.propagate(e);
     } finally {
-      if(dataOutput != null) {
+      if (dataOutput != null) {
         try {
           dataOutput.close();
         } catch (IOException e) {
@@ -149,7 +152,7 @@ public abstract class TransactionEventRecord implements Writable {
   static TransactionEventRecord fromDataInputV2(DataInput in)
       throws IOException {
     int header = in.readInt();
-    if(header != MAGIC_HEADER) {
+    if (header != MAGIC_HEADER) {
       throw new IOException("Header " + Integer.toHexString(header) +
           " is not the required value: " + Integer.toHexString(MAGIC_HEADER));
     }
@@ -176,10 +179,10 @@ public abstract class TransactionEventRecord implements Writable {
           ProtosFactory.TransactionEventFooter.newBuilder().build();
       footer.writeDelimitedTo(byteOutput);
       return ByteBuffer.wrap(byteOutput.toByteArray());
-    } catch(IOException e) {
+    } catch (IOException e) {
       throw Throwables.propagate(e);
     } finally {
-      if(byteOutput != null) {
+      if (byteOutput != null) {
         try {
           byteOutput.close();
         } catch (IOException e) {
@@ -194,23 +197,19 @@ public abstract class TransactionEventRecord implements Writable {
       throws IOException, CorruptEventException {
     ByteArrayInputStream in = new ByteArrayInputStream(buffer);
     try {
-      ProtosFactory.TransactionEventHeader header = Preconditions.
-          checkNotNull(ProtosFactory.TransactionEventHeader.
-              parseDelimitedFrom(in), "Header cannot be null");
+      ProtosFactory.TransactionEventHeader header = Preconditions.checkNotNull(
+          ProtosFactory.TransactionEventHeader.parseDelimitedFrom(in), "Header cannot be null");
       short type = (short)header.getType();
       long transactionID = header.getTransactionID();
       long writeOrderID = header.getWriteOrderID();
-      TransactionEventRecord transactionEvent =
-          newRecordForType(type, transactionID, writeOrderID);
+      TransactionEventRecord transactionEvent = newRecordForType(type, transactionID, writeOrderID);
       transactionEvent.readProtos(in);
       @SuppressWarnings("unused")
       ProtosFactory.TransactionEventFooter footer = Preconditions.checkNotNull(
-          ProtosFactory.TransactionEventFooter.
-          parseDelimitedFrom(in), "Footer cannot be null");
+          ProtosFactory.TransactionEventFooter.parseDelimitedFrom(in), "Footer cannot be null");
       return transactionEvent;
     } catch (InvalidProtocolBufferException ex) {
-      throw new CorruptEventException(
-        "Could not parse event from data file.", ex);
+      throw new CorruptEventException("Could not parse event from data file.", ex);
     } finally {
       try {
         in.close();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
index a9f6be6..12e5c7d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionIDOracle.java
@@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class TransactionIDOracle {
 
   private TransactionIDOracle() {}
+
   private static final AtomicLong TRANSACTION_ID =
       new AtomicLong(System.currentTimeMillis());
 
   public static void setSeed(long highest) {
     long previous;
-    while(highest > (previous = TRANSACTION_ID.get())) {
+    while (highest > (previous = TRANSACTION_ID.get())) {
       TRANSACTION_ID.compareAndSet(previous, highest);
     }
   }
+
   public static long next() {
     return TRANSACTION_ID.incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
index 69072db..2ebd42d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WritableUtils.java
@@ -75,7 +75,7 @@ class WritableUtils {
     long tmp = i;
     while (tmp != 0) {
       tmp = tmp >> 8;
-    len--;
+      len--;
     }
 
     stream.writeByte((byte)len);
@@ -92,8 +92,8 @@ class WritableUtils {
   /**
    * Reads a zero-compressed encoded long from input stream and returns it.
    * @param stream Binary input stream
-   * @throws java.io.IOException
    * @return deserialized long from stream.
+   * @throws java.io.IOException
    */
   public static long readVLong(DataInput stream) throws IOException {
     byte firstByte = stream.readByte();
@@ -102,7 +102,7 @@ class WritableUtils {
       return firstByte;
     }
     long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
+    for (int idx = 0; idx < len - 1; idx++) {
       byte b = stream.readByte();
       i = i << 8;
       i = i | (b & 0xFF);
@@ -113,8 +113,8 @@ class WritableUtils {
   /**
    * Reads a zero-compressed encoded integer from input stream and returns it.
    * @param stream Binary input stream
-   * @throws java.io.IOException
    * @return deserialized integer from stream.
+   * @throws java.io.IOException
    */
   public static int readVInt(DataInput stream) throws IOException {
     long n = readVLong(stream);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
index dbf1c1e..b26cbb4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/WriteOrderOracle.java
@@ -23,15 +23,17 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class WriteOrderOracle {
 
   private WriteOrderOracle() {}
+
   private static final AtomicLong WRITER_ORDERER =
       new AtomicLong(System.currentTimeMillis());
 
   public static void setSeed(long highest) {
     long previous;
-    while(highest > (previous = WRITER_ORDERER.get())) {
+    while (highest > (previous = WRITER_ORDERER.get())) {
       WRITER_ORDERER.compareAndSet(previous, highest);
     }
   }
+
   public static long next() {
     return WRITER_ORDERER.incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
index 9ee4245..e1116d2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/AESCTRNoPaddingProvider.java
@@ -46,7 +46,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
   }
 
   public static class EncryptorBuilder
-    extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> {
+      extends CipherProvider.Encryptor.Builder<AESCTRNoPaddingEncryptor> {
+
     @Override
     public AESCTRNoPaddingEncryptor build() {
       ByteBuffer buffer = ByteBuffer.allocate(16);
@@ -58,9 +59,8 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
     }
   }
 
-
   public static class DecryptorBuilder
-    extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> {
+      extends CipherProvider.Decryptor.Builder<AESCTRNoPaddingDecryptor> {
     @Override
     public AESCTRNoPaddingDecryptor build() {
       return new AESCTRNoPaddingDecryptor(key, parameters);
@@ -70,18 +70,22 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
   private static class AESCTRNoPaddingEncryptor extends Encryptor {
     private byte[] parameters;
     private Cipher cipher;
+
     private AESCTRNoPaddingEncryptor(Key key, byte[] parameters) {
       this.parameters = parameters;
       cipher = getCipher(key, Cipher.ENCRYPT_MODE, parameters);
     }
+
     @Override
     public byte[] getParameters() {
       return parameters;
     }
+
     @Override
     public String getCodec() {
       return TYPE;
     }
+
     @Override
     public byte[] encrypt(byte[] clearText) {
       return doFinal(cipher, clearText);
@@ -90,21 +94,23 @@ public class AESCTRNoPaddingProvider extends CipherProvider {
 
   private static class AESCTRNoPaddingDecryptor extends Decryptor {
     private Cipher cipher;
+
     private AESCTRNoPaddingDecryptor(Key key, byte[] parameters) {
       cipher = getCipher(key, Cipher.DECRYPT_MODE, parameters);
     }
+
     @Override
     public byte[] decrypt(byte[] cipherText) {
       return doFinal(cipher, cipherText);
     }
+
     @Override
     public String getCodec() {
       return TYPE;
     }
   }
 
-  private static byte[] doFinal(Cipher cipher, byte[] input)
-    throws DecryptionFailureException{
+  private static byte[] doFinal(Cipher cipher, byte[] input) throws DecryptionFailureException {
     try {
       return cipher.doFinal(input);
     } catch (Exception e) {


Mime
View raw message