flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [8/9] flume git commit: FLUME-2941. Integrate checkstyle for test classes
Date Fri, 08 Jul 2016 22:51:00 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index 1adb21a..f1700f9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -55,6 +55,7 @@ public class TestFlumeEventQueue {
     File inflightTakes;
     File inflightPuts;
     File queueSetDir;
+
     EventQueueBackingStoreSupplier() {
       baseDir = Files.createTempDir();
       checkpoint = new File(baseDir, "checkpoint");
@@ -62,62 +63,73 @@ public class TestFlumeEventQueue {
       inflightPuts =  new File(baseDir, "inflighttakes");
       queueSetDir =  new File(baseDir, "queueset");
     }
+
     File getCheckpoint() {
       return checkpoint;
     }
+
     File getInflightPuts() {
       return inflightPuts;
     }
+
     File getInflightTakes() {
       return inflightTakes;
     }
+
     File getQueueSetDir() {
       return queueSetDir;
     }
+
     void delete() {
       FileUtils.deleteQuietly(baseDir);
     }
-    abstract EventQueueBackingStore get() throws Exception ;
+
+    abstract EventQueueBackingStore get() throws Exception;
   }
 
   @Parameters
   public static Collection<Object[]> data() throws Exception {
-    Object[][] data = new Object[][] { {
-      new EventQueueBackingStoreSupplier() {
-        @Override
-        public EventQueueBackingStore get() throws Exception {
-          Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
-          return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
-              "test");
+    Object[][] data = new Object[][] {
+      {
+        new EventQueueBackingStoreSupplier() {
+          @Override
+          public EventQueueBackingStore get() throws Exception {
+            Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+            return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
+                                                    "test");
+          }
         }
-      }
-    }, {
-      new EventQueueBackingStoreSupplier() {
-        @Override
-        public EventQueueBackingStore get() throws Exception {
-          Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
-          return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000,
-              "test");
+      },
+      {
+        new EventQueueBackingStoreSupplier() {
+          @Override
+          public EventQueueBackingStore get() throws Exception {
+            Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+            return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test");
+          }
         }
       }
-    } };
+    };
     return Arrays.asList(data);
   }
 
   public TestFlumeEventQueue(EventQueueBackingStoreSupplier backingStoreSupplier) {
     this.backingStoreSupplier = backingStoreSupplier;
   }
+
   @Before
   public void setup() throws Exception {
     backingStore = backingStoreSupplier.get();
   }
+
   @After
   public void cleanup() throws IOException {
-    if(backingStore != null) {
+    if (backingStore != null) {
       backingStore.close();
     }
     backingStoreSupplier.delete();
   }
+
   @Test
   public void testCapacity() throws Exception {
     backingStore.close();
@@ -125,70 +137,76 @@ public class TestFlumeEventQueue {
     Assert.assertTrue(checkpoint.delete());
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
-  @Test(expected=IllegalArgumentException.class)
+
+  @Test(expected = IllegalArgumentException.class)
   public void testInvalidCapacityZero() throws Exception {
     backingStore.close();
     File checkpoint = backingStoreSupplier.getCheckpoint();
     Assert.assertTrue(checkpoint.delete());
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
   }
-  @Test(expected=IllegalArgumentException.class)
+
+  @Test(expected = IllegalArgumentException.class)
   public void testInvalidCapacityNegative() throws Exception {
     backingStore.close();
     File checkpoint = backingStoreSupplier.getCheckpoint();
     Assert.assertTrue(checkpoint.delete());
     backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
   }
+
   @Test
   public void testQueueIsEmptyAfterCreation() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertNull(queue.removeHead(0L));
   }
+
   @Test
   public void addTail1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
+
   @Test
   public void addTail2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs());
   }
+
   @Test
   public void addTailLarge() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -203,23 +221,25 @@ public class TestFlumeEventQueue {
     }
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
+
   @Test
   public void addHead1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
+
   @Test
   public void addHead2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     queue.replayComplete();
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
@@ -227,12 +247,13 @@ public class TestFlumeEventQueue {
     Assert.assertEquals(pointer2, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
   }
+
   @Test
   public void addHeadLarge() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     queue.replayComplete();
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
@@ -248,12 +269,13 @@ public class TestFlumeEventQueue {
     }
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
+
   @Test
   public void addTailRemove1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
@@ -266,9 +288,9 @@ public class TestFlumeEventQueue {
   @Test
   public void addTailRemove2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -279,31 +301,33 @@ public class TestFlumeEventQueue {
   @Test
   public void addHeadRemove1() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertNull(queue.removeHead(0));
   }
+
   @Test
   public void addHeadRemove2() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
     queue.replayComplete();
     Assert.assertEquals(pointer2, queue.removeHead(0));
   }
+
   @Test
   public void testUnknownPointerDoesNotCauseSearch() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertFalse(queue.remove(pointer3)); // does search
@@ -312,44 +336,47 @@ public class TestFlumeEventQueue {
     queue.replayComplete();
     Assert.assertEquals(2, queue.getSearchCount());
   }
-  @Test(expected=IllegalStateException.class)
+
+  @Test(expected = IllegalStateException.class)
   public void testRemoveAfterReplayComplete() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     queue.replayComplete();
     queue.remove(pointer1);
   }
+
   @Test
   public void testWrappingCorrectly() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
-      if(!queue.addHead(new FlumeEventPointer(i, i))) {
+      if (!queue.addHead(new FlumeEventPointer(i, i))) {
         break;
       }
     }
-    for (int i = queue.getSize()/2; i > 0; i--) {
+    for (int i = queue.getSize() / 2; i > 0; i--) {
       Assert.assertNotNull(queue.removeHead(0));
     }
     // addHead below would throw an IndexOOBounds with
     // bad version of FlumeEventQueue.convert
     for (int i = 1; i <= size; i++) {
-      if(!queue.addHead(new FlumeEventPointer(i, i))) {
+      if (!queue.addHead(new FlumeEventPointer(i, i))) {
         break;
       }
     }
   }
+
   @Test
-  public void testInflightPuts() throws Exception{
+  public void testInflightPuts() throws Exception {
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
     long txnID2 = txnID1 + 1;
     queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -358,16 +385,13 @@ public class TestFlumeEventQueue {
     queue.checkpoint(true);
     TimeUnit.SECONDS.sleep(3L);
     queue = new FlumeEventQueue(backingStore,
-        backingStoreSupplier.getInflightTakes(),
-        backingStoreSupplier.getInflightPuts(),
-        backingStoreSupplier.getQueueSetDir());
+                                backingStoreSupplier.getInflightTakes(),
+                                backingStoreSupplier.getInflightPuts(),
+                                backingStoreSupplier.getQueueSetDir());
     SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
-    Assert.assertTrue(deserializedMap.get(
-            txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
-    Assert.assertTrue(deserializedMap.get(
-            txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
-    Assert.assertTrue(deserializedMap.get(
-            txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
+    Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(2, 1).toLong()));
+    Assert.assertTrue(deserializedMap.get(txnID2).contains(new FlumeEventPointer(2, 2).toLong()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
index 2fbe116..a138ed4 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java
@@ -18,13 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Context;
 import org.apache.flume.conf.Configurables;
@@ -37,8 +32,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 public class TestIntegration {
 
@@ -58,19 +57,21 @@ public class TestIntegration {
     dataDirs = new File[3];
     dataDir = "";
     for (int i = 0; i < dataDirs.length; i++) {
-      dataDirs[i] = new File(baseDir, "data" + (i+1));
+      dataDirs[i] = new File(baseDir, "data" + (i + 1));
       Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory());
       dataDir += dataDirs[i].getAbsolutePath() + ",";
     }
     dataDir = dataDir.substring(0, dataDir.length() - 1);
   }
+
   @After
   public void teardown() {
-    if(channel != null && channel.isOpen()) {
+    if (channel != null && channel.isOpen()) {
       channel.stop();
     }
     FileUtils.deleteQuietly(baseDir);
   }
+
   @Test
   public void testIntegration() throws IOException, InterruptedException {
     // set shorter checkpoint and filesize to ensure
@@ -106,11 +107,11 @@ public class TestIntegration {
     TimeUnit.SECONDS.sleep(30);
     // shutdown source
     sourceRunner.shutdown();
-    while(sourceRunner.isAlive()) {
+    while (sourceRunner.isAlive()) {
       Thread.sleep(10L);
     }
     // wait for queue to clear
-    while(channel.getDepth() > 0) {
+    while (channel.getDepth() > 0) {
       Thread.sleep(10L);
     }
     // shutdown size
@@ -122,15 +123,15 @@ public class TestIntegration {
       logs.addAll(LogUtils.getLogs(dataDirs[i]));
     }
     LOG.info("Total Number of Logs = " + logs.size());
-    for(File logFile : logs) {
+    for (File logFile : logs) {
       LOG.info("LogFile = " + logFile);
     }
     LOG.info("Source processed " + sinkRunner.getCount());
     LOG.info("Sink processed " + sourceRunner.getCount());
-    for(Exception ex : sourceRunner.getErrors()) {
+    for (Exception ex : sourceRunner.getErrors()) {
       LOG.warn("Source had error", ex);
     }
-    for(Exception ex : sinkRunner.getErrors()) {
+    for (Exception ex : sinkRunner.getErrors()) {
       LOG.warn("Sink had error", ex);
     }
     Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount());

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index b1f59cd..f7f0950 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -18,14 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.mockito.Mockito.*;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.*;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -34,8 +28,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestLog {
   private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class);
@@ -45,6 +44,7 @@ public class TestLog {
   private File checkpointDir;
   private File[] dataDirs;
   private long transactionID;
+
   @Before
   public void setup() throws IOException {
     transactionID = 0;
@@ -56,15 +56,20 @@ public class TestLog {
       dataDirs[i] = Files.createTempDir();
       Assert.assertTrue(dataDirs[i].isDirectory());
     }
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false)
-            .setChannelName("testlog").build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setCheckpointOnClose(false)
+                           .setChannelName("testlog")
+                           .build();
     log.replay();
   }
+
   @After
-  public void cleanup() throws Exception{
-    if(log != null) {
+  public void cleanup() throws Exception {
+    if (log != null) {
       log.close();
     }
     FileUtils.deleteQuietly(checkpointDir);
@@ -72,13 +77,14 @@ public class TestLog {
       FileUtils.deleteQuietly(dataDirs[i]);
     }
   }
+
   /**
    * Test that we can put, commit and then get. Note that get is
    * not transactional so the commit is not required.
    */
   @Test
   public void testPutGet()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -89,9 +95,10 @@ public class TestLog {
     Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
     Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
   }
+
   @Test
   public void testRoll()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     log.shutdownWorker();
     Thread.sleep(1000);
     for (int i = 0; i < 1000; i++) {
@@ -105,9 +112,9 @@ public class TestLog {
       Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
     }
     int logCount = 0;
-    for(File dataDir : dataDirs) {
-      for(File logFile : dataDir.listFiles()) {
-        if(logFile.getName().startsWith("log-")) {
+    for (File dataDir : dataDirs) {
+      for (File logFile : dataDir.listFiles()) {
+        if (logFile.getName().startsWith("log-")) {
           logCount++;
         }
       }
@@ -115,26 +122,30 @@ public class TestLog {
     // 93 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000
     Assert.assertEquals(186, logCount);
   }
+
   /**
    * After replay of the log, we should find the event because the put
    * was committed
    */
   @Test
   public void testPutCommit()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn);
     log.commitPut(transactionID);
     log.close();
-    log = new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
-                dataDirs).setChannelName("testlog").build();
+    log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                           .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .build();
     log.replay();
     takeAndVerify(eventPointerIn, eventIn);
   }
+
   /**
    * After replay of the log, we should not find the event because the
    * put was rolled back
@@ -146,39 +157,44 @@ public class TestLog {
     log.put(transactionID, eventIn);
     log.rollback(transactionID); // rolled back so it should not be replayed
     log.close();
-    log = new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
-                dataDirs).setChannelName("testlog").build();
+    log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                           .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Assert.assertNull(queue.removeHead(transactionID));
   }
+
   @Test
   public void testMinimumRequiredSpaceTooSmallOnStartup() throws IOException,
-    InterruptedException {
+                                                                     InterruptedException {
     log.close();
-    log = new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
-                dataDirs).setChannelName("testlog").
-                setMinimumRequiredSpace(Long.MAX_VALUE).build();
+    log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                           .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .setMinimumRequiredSpace(Long.MAX_VALUE)
+                           .build();
     try {
       log.replay();
       Assert.fail();
     } catch (IOException e) {
-      Assert.assertTrue(e.getMessage(), e.getMessage()
-          .startsWith("Usable space exhausted"));
+      Assert.assertTrue(e.getMessage(),
+                        e.getMessage().startsWith("Usable space exhausted"));
     }
   }
+
   /**
    * There is a race here in that someone could take up some space
    */
   @Test
-  public void testMinimumRequiredSpaceTooSmallForPut() throws IOException,
-    InterruptedException {
+  public void testMinimumRequiredSpaceTooSmallForPut() throws IOException, InterruptedException {
     try {
       doTestMinimumRequiredSpaceTooSmallForPut();
     } catch (IOException e) {
@@ -189,23 +205,26 @@ public class TestLog {
       doTestMinimumRequiredSpaceTooSmallForPut();
     }
   }
+
   public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException,
-    InterruptedException {
+                                                                    InterruptedException {
     long minimumRequiredSpace = checkpointDir.getUsableSpace() -
-        (10L* 1024L * 1024L);
+                                    (10L * 1024L * 1024L);
     log.close();
-    log = new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
-                dataDirs).setChannelName("testlog").
-                setMinimumRequiredSpace(minimumRequiredSpace)
-                .setUsableSpaceRefreshInterval(1L).build();
+    log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                           .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .setMinimumRequiredSpace(minimumRequiredSpace)
+                           .setUsableSpaceRefreshInterval(1L)
+                           .build();
     log.replay();
     File filler = new File(checkpointDir, "filler");
     byte[] buffer = new byte[64 * 1024];
     FileOutputStream out = new FileOutputStream(filler);
-    while(checkpointDir.getUsableSpace() > minimumRequiredSpace) {
+    while (checkpointDir.getUsableSpace() > minimumRequiredSpace) {
       out.write(buffer);
     }
     out.close();
@@ -215,10 +234,11 @@ public class TestLog {
       log.put(transactionID, eventIn);
       Assert.fail();
     } catch (IOException e) {
-      Assert.assertTrue(e.getMessage(), e.getMessage()
-          .startsWith("Usable space exhausted"));
+      Assert.assertTrue(e.getMessage(),
+                        e.getMessage().startsWith("Usable space exhausted"));
     }
   }
+
   /**
    * After replay of the log, we should not find the event because the take
    * was committed
@@ -233,11 +253,13 @@ public class TestLog {
     log.take(takeTransactionID, eventPointer);
     log.commitTake(takeTransactionID);
     log.close();
-    new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").build();
+    new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                     .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                     .setQueueSize(1)
+                     .setCheckpointDir(checkpointDir)
+                     .setLogDirs(dataDirs)
+                     .setChannelName("testlog")
+                     .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Assert.assertNull(queue.removeHead(0));
@@ -249,16 +271,18 @@ public class TestLog {
    */
   @Test
   public void testPutTakeRollbackLogReplayV1()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     doPutTakeRollback(true);
   }
+
   @Test
   public void testPutTakeRollbackLogReplayV2()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     doPutTakeRollback(false);
   }
+
   public void doPutTakeRollback(boolean useLogReplayV1)
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long putTransactionID = ++transactionID;
     FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn);
@@ -267,11 +291,14 @@ public class TestLog {
     log.take(takeTransactionID, eventPointerIn);
     log.rollback(takeTransactionID);
     log.close();
-    new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").setUseLogReplayV1(useLogReplayV1).build();
+    new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                     .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                     .setQueueSize(1)
+                     .setCheckpointDir(checkpointDir)
+                     .setLogDirs(dataDirs)
+                     .setChannelName("testlog")
+                     .setUseLogReplayV1(useLogReplayV1)
+                     .build();
     log.replay();
     takeAndVerify(eventPointerIn, eventIn);
   }
@@ -281,11 +308,13 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.commitPut(putTransactionID);
     log.close();
-    new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").build();
+    new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                     .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                     .setQueueSize(1)
+                     .setCheckpointDir(checkpointDir)
+                     .setLogDirs(dataDirs)
+                     .setChannelName("testlog")
+                     .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -297,11 +326,13 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.commitTake(putTransactionID);
     log.close();
-    new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").build();
+    new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                     .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                     .setQueueSize(1)
+                     .setCheckpointDir(checkpointDir)
+                     .setLogDirs(dataDirs)
+                     .setChannelName("testlog")
+                     .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -313,11 +344,13 @@ public class TestLog {
     long putTransactionID = ++transactionID;
     log.rollback(putTransactionID);
     log.close();
-    new Log.Builder().setCheckpointInterval(
-        Long.MAX_VALUE).setMaxFileSize(
-            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
-            1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").build();
+    new Log.Builder().setCheckpointInterval(Long.MAX_VALUE)
+                     .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)
+                     .setQueueSize(1)
+                     .setCheckpointDir(checkpointDir)
+                     .setLogDirs(dataDirs)
+                     .setChannelName("testlog")
+                     .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
@@ -337,7 +370,7 @@ public class TestLog {
       File logGzip = new File(logDir, Log.PREFIX + i + ".gz");
       Assert.assertTrue(metaDataFile.isFile() || metaDataFile.createNewFile());
       Assert.assertTrue(metaDataTempFile.isFile() ||
-          metaDataTempFile.createNewFile());
+                            metaDataTempFile.createNewFile());
       Assert.assertTrue(log.isFile() || logGzip.createNewFile());
     }
     List<File> actual = LogUtils.getLogs(logDir);
@@ -345,31 +378,38 @@ public class TestLog {
     LogUtils.sort(expected);
     Assert.assertEquals(expected, actual);
   }
+
   @Test
   public void testReplayFailsWithAllEmptyLogMetaDataNormalReplay()
       throws IOException, InterruptedException {
     doTestReplayFailsWithAllEmptyLogMetaData(false);
   }
+
   @Test
   public void testReplayFailsWithAllEmptyLogMetaDataFastReplay()
       throws IOException, InterruptedException {
     doTestReplayFailsWithAllEmptyLogMetaData(true);
   }
+
   public void doTestReplayFailsWithAllEmptyLogMetaData(boolean useFastReplay)
       throws IOException, InterruptedException {
     // setup log with correct fast replay parameter
     log.close();
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .setUseFastReplay(useFastReplay)
+                           .build();
     log.replay();
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     log.put(transactionID, eventIn);
     log.commitPut(transactionID);
     log.close();
-    if(useFastReplay) {
+    if (useFastReplay) {
       FileUtils.deleteQuietly(checkpointDir);
       Assert.assertTrue(checkpointDir.mkdir());
     }
@@ -378,41 +418,50 @@ public class TestLog {
       logFiles.addAll(LogUtils.getLogs(dataDirs[i]));
     }
     Assert.assertTrue(logFiles.size() > 0);
-    for(File logFile : logFiles) {
+    for (File logFile : logFiles) {
       File logFileMeta = Serialization.getMetaDataFile(logFile);
       Assert.assertTrue(logFileMeta.delete());
       Assert.assertTrue(logFileMeta.createNewFile());
     }
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").setUseFastReplay(useFastReplay).build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .setUseFastReplay(useFastReplay)
+                           .build();
     try {
       log.replay();
       Assert.fail();
-    } catch(IllegalStateException expected) {
+    } catch (IllegalStateException expected) {
       String msg = expected.getMessage();
       Assert.assertNotNull(msg);
       Assert.assertTrue(msg, msg.contains(".meta is empty, but log"));
     }
   }
+
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
     log.commitPut(transactionID); // this is not required since
     log.close();
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .build();
     doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
   }
+
   @Test
   public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay()
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     long transactionID = ++this.transactionID;
     FlumeEventPointer eventPointer = log.put(transactionID, eventIn);
@@ -421,18 +470,23 @@ public class TestLog {
     checkpointDir = Files.createTempDir();
     FileUtils.forceDeleteOnExit(checkpointDir);
     Assert.assertTrue(checkpointDir.isDirectory());
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-        MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs)
-            .setChannelName("testlog").setUseFastReplay(true).build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setChannelName("testlog")
+                           .setUseFastReplay(true)
+                           .build();
     doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
   }
+
   public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn,
-      FlumeEventPointer eventPointer) throws IOException,
-    InterruptedException, NoopRecordException, CorruptEventException  {
+                                                             FlumeEventPointer eventPointer)
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     for (int i = 0; i < dataDirs.length; i++) {
-      for(File logFile : LogUtils.getLogs(dataDirs[i])) {
-        if(logFile.length() == 0L) {
+      for (File logFile : LogUtils.getLogs(dataDirs[i])) {
+        if (logFile.length() == 0L) {
           File logFileMeta = Serialization.getMetaDataFile(logFile);
           Assert.assertTrue(logFileMeta.delete());
           Assert.assertTrue(logFileMeta.createNewFile());
@@ -445,16 +499,15 @@ public class TestLog {
     Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
     Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
   }
+
   @Test
   public void testCachedFSUsableSpace() throws Exception {
     File fs = mock(File.class);
     when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE);
-    LogFile.CachedFSUsableSpace cachedFS =
-        new LogFile.CachedFSUsableSpace(fs, 1000L);
+    LogFile.CachedFSUsableSpace cachedFS = new LogFile.CachedFSUsableSpace(fs, 1000L);
     Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE);
     cachedFS.decrement(Integer.MAX_VALUE);
-    Assert.assertEquals(cachedFS.getUsableSpace(),
-        Long.MAX_VALUE - Integer.MAX_VALUE);
+    Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - Integer.MAX_VALUE);
     try {
       cachedFS.decrement(-1);
       Assert.fail();
@@ -463,20 +516,22 @@ public class TestLog {
     }
     when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L);
     Thread.sleep(1100);
-    Assert.assertEquals(cachedFS.getUsableSpace(),
-        Long.MAX_VALUE - 1L);
+    Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - 1L);
   }
 
   @Test
   public void testCheckpointOnClose() throws Exception {
     log.close();
-    log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize(
-            MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir(
-            checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true)
-            .setChannelName("testLog").build();
+    log = new Log.Builder().setCheckpointInterval(1L)
+                           .setMaxFileSize(MAX_FILE_SIZE)
+                           .setQueueSize(CAPACITY)
+                           .setCheckpointDir(checkpointDir)
+                           .setLogDirs(dataDirs)
+                           .setCheckpointOnClose(true)
+                           .setChannelName("testLog")
+                           .build();
     log.replay();
 
-
     // 1 Write One Event
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
     log.put(transactionID, eventIn);
@@ -484,20 +539,19 @@ public class TestLog {
 
     // 2 Check state of checkpoint before close
     File checkPointMetaFile =
-            FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next();
-    long before = FileUtils.checksumCRC32( checkPointMetaFile );
+        FileUtils.listFiles(checkpointDir, new String[] { "meta" }, false).iterator().next();
+    long before = FileUtils.checksumCRC32(checkPointMetaFile);
 
     // 3 Close Log
     log.close();
 
     // 4 Verify that checkpoint was modified on close
-    long after = FileUtils.checksumCRC32( checkPointMetaFile );
-    Assert.assertFalse( before == after );
+    long after = FileUtils.checksumCRC32(checkPointMetaFile);
+    Assert.assertFalse(before == after);
   }
 
-  private void takeAndVerify(FlumeEventPointer eventPointerIn,
-      FlumeEvent eventIn)
-    throws IOException, InterruptedException, NoopRecordException, CorruptEventException  {
+  private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn)
+      throws IOException, InterruptedException, NoopRecordException, CorruptEventException {
     FlumeEventQueue queue = log.getFlumeEventQueue();
     FlumeEventPointer eventPointerOut = queue.removeHead(0);
     Assert.assertNotNull(eventPointerOut);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 976a112..d945c7f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -18,6 +18,16 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -28,33 +38,21 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
 public class TestLogFile {
   private int fileID;
   private long transactionID;
   private LogFile.Writer logFileWriter;
   private File dataDir;
   private File dataFile;
+
   @Before
   public void setup() throws IOException {
     fileID = 1;
@@ -65,28 +63,30 @@ public class TestLogFile {
     logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
         Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE, true, 0);
   }
+
   @After
   public void cleanup() throws IOException {
     try {
-      if(logFileWriter != null) {
+      if (logFileWriter != null) {
         logFileWriter.close();
       }
     } finally {
       FileUtils.deleteQuietly(dataDir);
     }
   }
+
   @Test
   public void testWriterRefusesToOverwriteFile() throws IOException {
     Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null, Long.MAX_VALUE, true, 0);
+                               null, Long.MAX_VALUE, true, 0);
       Assert.fail();
     } catch (IllegalStateException e) {
-      Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
-          e.getMessage());
+      Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage());
     }
   }
+
   @Test
   public void testWriterFailsWithDirectory() throws IOException {
     FileUtils.deleteQuietly(dataFile);
@@ -94,30 +94,29 @@ public class TestLogFile {
     Assert.assertTrue(dataFile.mkdirs());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null, Long.MAX_VALUE, true, 0);
+                               null, Long.MAX_VALUE, true, 0);
       Assert.fail();
     } catch (IllegalStateException e) {
-      Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
-          e.getMessage());
+      Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage());
     }
   }
+
   @Test
   public void testPutGet() throws InterruptedException, IOException {
     final List<Throwable> errors =
         Collections.synchronizedList(new ArrayList<Throwable>());
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     CompletionService<Void> completionService = new ExecutorCompletionService
-      <Void>(executorService);
-    final LogFile.RandomReader logFileReader =
-        LogFileFactory.getRandomReader(dataFile, null, true);
+        <Void>(executorService);
+    final LogFile.RandomReader logFileReader = LogFileFactory.getRandomReader(dataFile, null, true);
     for (int i = 0; i < 1000; i++) {
       // first try and throw failures
       synchronized (errors) {
-        for(Throwable throwable : errors) {
+        for (Throwable throwable : errors) {
           Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
         }
         // then throw errors
-        for(Throwable throwable : errors) {
+        for (Throwable throwable : errors) {
           Throwables.propagate(throwable);
         }
       }
@@ -134,7 +133,7 @@ public class TestLogFile {
             FlumeEvent eventOut = logFileReader.get(offset);
             Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
             Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
-          } catch(Throwable throwable) {
+          } catch (Throwable throwable) {
             synchronized (errors) {
               errors.add(throwable);
             }
@@ -143,26 +142,26 @@ public class TestLogFile {
       }, null);
     }
 
-    for(int i = 0; i < 1000; i++) {
+    for (int i = 0; i < 1000; i++) {
       completionService.take();
     }
     // first try and throw failures
-    for(Throwable throwable : errors) {
+    for (Throwable throwable : errors) {
       Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
     }
     // then throw errors
-    for(Throwable throwable : errors) {
+    for (Throwable throwable : errors) {
       Throwables.propagate(throwable);
     }
   }
+
   @Test
   public void testReader() throws InterruptedException, IOException,
-    CorruptEventException {
+      CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
-      Put put = new Put(++transactionID, WriteOrderOracle.next(),
-          eventIn);
+      Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       puts.put(ptr.getOffset(), put);
@@ -170,14 +169,14 @@ public class TestLogFile {
     LogFile.SequentialReader reader =
         LogFileFactory.getSequentialReader(dataFile, null, true);
     LogRecord entry;
-    while((entry = reader.next()) != null) {
+    while ((entry = reader.next()) != null) {
       Integer offset = entry.getOffset();
       TransactionEventRecord record = entry.getEvent();
       Put put = puts.get(offset);
       FlumeEvent eventIn = put.getEvent();
       Assert.assertEquals(put.getTransactionID(), record.getTransactionID());
       Assert.assertTrue(record instanceof Put);
-      FlumeEvent eventOut = ((Put)record).getEvent();
+      FlumeEvent eventOut = ((Put) record).getEvent();
       Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
       Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
     }
@@ -185,12 +184,12 @@ public class TestLogFile {
 
   @Test
   public void testReaderOldMetaFile() throws InterruptedException,
-    IOException, CorruptEventException {
+      IOException, CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
       Put put = new Put(++transactionID, WriteOrderOracle.next(),
-              eventIn);
+          eventIn);
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       puts.put(ptr.getOffset(), put);
@@ -202,7 +201,7 @@ public class TestLogFile {
       Assert.fail("Renaming to meta.old failed");
     }
     LogFile.SequentialReader reader =
-            LogFileFactory.getSequentialReader(dataFile, null, true);
+        LogFileFactory.getSequentialReader(dataFile, null, true);
     Assert.assertTrue(metadataFile.exists());
     Assert.assertFalse(oldMetadataFile.exists());
     LogRecord entry;
@@ -219,14 +218,14 @@ public class TestLogFile {
     }
   }
 
-    @Test
-  public void testReaderTempMetaFile() throws InterruptedException,
-      IOException, CorruptEventException {
+  @Test
+  public void testReaderTempMetaFile()
+      throws InterruptedException, IOException, CorruptEventException {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
       Put put = new Put(++transactionID, WriteOrderOracle.next(),
-              eventIn);
+          eventIn);
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       puts.put(ptr.getOffset(), put);
@@ -240,7 +239,7 @@ public class TestLogFile {
       Assert.fail("Renaming to meta.temp failed");
     }
     LogFile.SequentialReader reader =
-            LogFileFactory.getSequentialReader(dataFile, null, true);
+        LogFileFactory.getSequentialReader(dataFile, null, true);
     Assert.assertTrue(metadataFile.exists());
     Assert.assertFalse(tempMetadataFile.exists());
     Assert.assertFalse(oldMetadataFile.exists());
@@ -257,9 +256,10 @@ public class TestLogFile {
       Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
     }
   }
+
   @Test
   public void testWriteDelimitedTo() throws IOException {
-    if(dataFile.isFile()) {
+    if (dataFile.isFile()) {
       Assert.assertTrue(dataFile.delete());
     }
     Assert.assertTrue(dataFile.createNewFile());
@@ -270,25 +270,24 @@ public class TestLogFile {
     metaDataBuilder.setCheckpointPosition(3);
     metaDataBuilder.setCheckpointWriteOrderID(4);
     LogFileV3.writeDelimitedTo(metaDataBuilder.build(), dataFile);
-    ProtosFactory.LogFileMetaData metaData = ProtosFactory.LogFileMetaData.
-        parseDelimitedFrom(new FileInputStream(dataFile));
+    ProtosFactory.LogFileMetaData metaData =
+        ProtosFactory.LogFileMetaData.parseDelimitedFrom(new FileInputStream(dataFile));
     Assert.assertEquals(1, metaData.getVersion());
     Assert.assertEquals(2, metaData.getLogFileID());
     Assert.assertEquals(3, metaData.getCheckpointPosition());
     Assert.assertEquals(4, metaData.getCheckpointWriteOrderID());
   }
 
-  @Test (expected = CorruptEventException.class)
+  @Test(expected = CorruptEventException.class)
   public void testPutGetCorruptEvent() throws Exception {
     final LogFile.RandomReader logFileReader =
-      LogFileFactory.getRandomReader(dataFile, null, true);
+        LogFileFactory.getRandomReader(dataFile, null, true);
     final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
-    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
-      eventIn);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
     ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
     FlumeEventPointer ptr = logFileWriter.put(bytes);
-    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
-      (transactionID, WriteOrderOracle.next())));
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+        new Commit(transactionID, WriteOrderOracle.next())));
     logFileWriter.sync();
     final int offset = ptr.getOffset();
     RandomAccessFile writer = new RandomAccessFile(dataFile, "rw");
@@ -300,24 +299,22 @@ public class TestLogFile {
 
     // Should have thrown an exception by now.
     Assert.fail();
-
   }
 
-  @Test (expected = NoopRecordException.class)
+  @Test(expected = NoopRecordException.class)
   public void testPutGetNoopEvent() throws Exception {
     final LogFile.RandomReader logFileReader =
-      LogFileFactory.getRandomReader(dataFile, null, true);
+        LogFileFactory.getRandomReader(dataFile, null, true);
     final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
-    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
-      eventIn);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
     ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
     FlumeEventPointer ptr = logFileWriter.put(bytes);
-    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
-      (transactionID, WriteOrderOracle.next())));
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+        new Commit(transactionID, WriteOrderOracle.next())));
     logFileWriter.sync();
     final int offset = ptr.getOffset();
-    LogFile.OperationRecordUpdater updater = new LogFile
-      .OperationRecordUpdater(dataFile);
+    LogFile.OperationRecordUpdater updater =
+        new LogFile.OperationRecordUpdater(dataFile);
     updater.markRecordAsNoop(offset);
     logFileReader.get(offset);
 
@@ -330,40 +327,38 @@ public class TestLogFile {
     File tempDir = Files.createTempDir();
     File temp = new File(tempDir, "temp");
     final RandomAccessFile tempFile = new RandomAccessFile(temp, "rw");
-    for(int i = 0; i < 5000; i++) {
+    for (int i = 0; i < 5000; i++) {
       tempFile.write(LogFile.OP_RECORD);
     }
     tempFile.seek(0);
     LogFile.OperationRecordUpdater recordUpdater = new LogFile
-      .OperationRecordUpdater(temp);
+        .OperationRecordUpdater(temp);
     //Convert every 10th byte into a noop byte
-    for(int i = 0; i < 5000; i+=10) {
+    for (int i = 0; i < 5000; i += 10) {
       recordUpdater.markRecordAsNoop(i);
     }
     recordUpdater.close();
 
     tempFile.seek(0);
     // Verify every 10th byte is actually a NOOP
-    for(int i = 0; i < 5000; i+=10) {
+    for (int i = 0; i < 5000; i += 10) {
       tempFile.seek(i);
       Assert.assertEquals(LogFile.OP_NOOP, tempFile.readByte());
     }
-
   }
 
   @Test
-  public void testOpRecordUpdaterWithFlumeEvents() throws Exception{
+  public void testOpRecordUpdaterWithFlumeEvents() throws Exception {
     final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500);
-    final Put put = new Put(++transactionID, WriteOrderOracle.next(),
-      eventIn);
+    final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn);
     ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
     FlumeEventPointer ptr = logFileWriter.put(bytes);
-    logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit
-      (transactionID, WriteOrderOracle.next())));
+    logFileWriter.commit(TransactionEventRecord.toByteBuffer(
+        new Commit(transactionID, WriteOrderOracle.next())));
     logFileWriter.sync();
     final int offset = ptr.getOffset();
-    LogFile.OperationRecordUpdater updater = new LogFile
-      .OperationRecordUpdater(dataFile);
+    LogFile.OperationRecordUpdater updater =
+        new LogFile.OperationRecordUpdater(dataFile);
     updater.markRecordAsNoop(offset);
     RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw");
     Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte());
@@ -375,7 +370,7 @@ public class TestLogFile {
     final CyclicBarrier barrier = new CyclicBarrier(20);
     ExecutorService executorService = Executors.newFixedThreadPool(20);
     ExecutorCompletionService<Void> completionService = new
-      ExecutorCompletionService<Void>(executorService);
+        ExecutorCompletionService<Void>(executorService);
     final LogFile.Writer writer = logFileWriter;
     final AtomicLong txnId = new AtomicLong(++transactionID);
     for (int i = 0; i < 20; i++) {
@@ -384,11 +379,11 @@ public class TestLogFile {
         public Void call() {
           try {
             Put put = new Put(txnId.incrementAndGet(),
-              WriteOrderOracle.next(), eventIn);
+                WriteOrderOracle.next(), eventIn);
             ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
             writer.put(bytes);
             writer.commit(TransactionEventRecord.toByteBuffer(
-              new Commit(txnId.get(), WriteOrderOracle.next())));
+                new Commit(txnId.get(), WriteOrderOracle.next())));
             barrier.await();
             writer.sync();
           } catch (Exception ex) {
@@ -399,17 +394,15 @@ public class TestLogFile {
       });
     }
 
-    for(int i = 0; i < 20; i++) {
+    for (int i = 0; i < 20; i++) {
       completionService.take().get();
     }
 
-    //At least 250*20, but can be higher due to serialization overhead
+    // At least 250*20, but can be higher due to serialization overhead
     Assert.assertTrue(logFileWriter.position() >= 5000);
     Assert.assertEquals(1, writer.getSyncCount());
-    Assert.assertTrue(logFileWriter.getLastCommitPosition() ==
-      logFileWriter.getLastSyncPosition());
+    Assert.assertTrue(logFileWriter.getLastCommitPosition() == logFileWriter.getLastSyncPosition());
 
     executorService.shutdown();
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
index 2356d90..1f07e1f 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
@@ -18,7 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.mockito.Mockito.*;
+import junit.framework.Assert;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -30,9 +31,8 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 
-import junit.framework.Assert;
-
-import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @SuppressWarnings("deprecation")
 public class TestTransactionEventRecordV2 {
@@ -127,7 +127,7 @@ public class TestTransactionEventRecordV2 {
     try {
       TransactionEventRecord.fromDataInputV2(toDataInput(in));
       Assert.fail();
-    } catch(NullPointerException e) {
+    } catch (NullPointerException e) {
       Assert.assertEquals("Unknown action ffff8000", e.getMessage());
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
index eb0ce04..512d290 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -18,7 +18,8 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.mockito.Mockito.*;
+import junit.framework.Assert;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -26,9 +27,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
-
-import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestTransactionEventRecordV3 {
 
@@ -52,6 +52,7 @@ public class TestTransactionEventRecordV3 {
     Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(),
         commit.getRecordType());
   }
+
   @Test
   public void testPutSerialization() throws IOException, CorruptEventException {
     Map<String, String> headers = new HashMap<String, String>();
@@ -69,9 +70,9 @@ public class TestTransactionEventRecordV3 {
     Assert.assertEquals(headers, out.getEvent().getHeaders());
     Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
   }
+
   @Test
-  public void testPutSerializationNullHeader() throws IOException,
-    CorruptEventException {
+  public void testPutSerializationNullHeader() throws IOException, CorruptEventException {
     Put in = new Put(System.currentTimeMillis(),
         WriteOrderOracle.next(),
         new FlumeEvent(null, new byte[0]));
@@ -84,11 +85,10 @@ public class TestTransactionEventRecordV3 {
     Assert.assertNotNull(out.getEvent().getHeaders());
     Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
   }
+
   @Test
-  public void testTakeSerialization() throws IOException,
-    CorruptEventException {
-    Take in = new Take(System.currentTimeMillis(),
-        WriteOrderOracle.next(), 10, 20);
+  public void testTakeSerialization() throws IOException, CorruptEventException {
+    Take in = new Take(System.currentTimeMillis(), WriteOrderOracle.next(), 10, 20);
     Take out = (Take)TransactionEventRecord.fromByteArray(toByteArray(in));
     Assert.assertEquals(in.getClass(), out.getClass());
     Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -99,10 +99,8 @@ public class TestTransactionEventRecordV3 {
   }
 
   @Test
-  public void testRollbackSerialization() throws IOException,
-    CorruptEventException {
-    Rollback in = new Rollback(System.currentTimeMillis(),
-        WriteOrderOracle.next());
+  public void testRollbackSerialization() throws IOException, CorruptEventException {
+    Rollback in = new Rollback(System.currentTimeMillis(), WriteOrderOracle.next());
     Rollback out = (Rollback)TransactionEventRecord.fromByteArray(toByteArray(in));
     Assert.assertEquals(in.getClass(), out.getClass());
     Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -111,10 +109,8 @@ public class TestTransactionEventRecordV3 {
   }
 
   @Test
-  public void testCommitSerialization() throws IOException,
-    CorruptEventException {
-    Commit in = new Commit(System.currentTimeMillis(),
-        WriteOrderOracle.next());
+  public void testCommitSerialization() throws IOException, CorruptEventException {
+    Commit in = new Commit(System.currentTimeMillis(), WriteOrderOracle.next());
     Commit out = (Commit)TransactionEventRecord.fromByteArray(toByteArray(in));
     Assert.assertEquals(in.getClass(), out.getClass());
     Assert.assertEquals(in.getRecordType(), out.getRecordType());
@@ -129,7 +125,7 @@ public class TestTransactionEventRecordV3 {
     try {
       TransactionEventRecord.fromByteArray(toByteArray(in));
       Assert.fail();
-    } catch(NullPointerException e) {
+    } catch (NullPointerException e) {
       Assert.assertEquals("Unknown action ffff8000", e.getMessage());
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 61f38d2..0ec1831 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -18,7 +18,21 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.fest.reflect.core.Reflection.*;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -36,22 +50,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.zip.GZIPInputStream;
 
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Assert;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.io.Resources;
+import static org.fest.reflect.core.Reflection.field;
+import static org.fest.reflect.core.Reflection.method;
 
 public class TestUtils {
 
@@ -119,7 +119,7 @@ public class TestUtils {
 
   public static List<File> getAllLogs(File[] dataDirs) {
     List<File> result = Lists.newArrayList();
-    for(File dataDir : dataDirs) {
+    for (File dataDir : dataDirs) {
       result.addAll(LogUtils.getLogs(dataDir));
     }
     return result;
@@ -139,24 +139,22 @@ public class TestUtils {
             .invoke(true));
   }
 
-  public static Set<String> takeEvents(Channel channel, int batchSize)
-    throws Exception {
+  public static Set<String> takeEvents(Channel channel, int batchSize) throws Exception {
     return takeEvents(channel, batchSize, false);
   }
 
-  public static Set<String> takeEvents(Channel channel,
-          int batchSize, boolean checkForCorruption) throws Exception {
+  public static Set<String> takeEvents(Channel channel, int batchSize, boolean checkForCorruption)
+      throws Exception {
     return takeEvents(channel, batchSize, Integer.MAX_VALUE, checkForCorruption);
   }
 
-  public static Set<String> takeEvents(Channel channel,
-    int batchSize, int numEvents) throws Exception {
+  public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents)
+      throws Exception {
     return takeEvents(channel, batchSize, numEvents, false);
   }
 
-  public static Set<String> takeEvents(Channel channel,
-          int batchSize, int numEvents, boolean checkForCorruption) throws
-    Exception {
+  public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents,
+                                       boolean checkForCorruption) throws Exception {
     Set<String> result = Sets.newHashSet();
     for (int i = 0; i < numEvents; i += batchSize) {
       Transaction transaction = channel.getTransaction();
@@ -169,16 +167,15 @@ public class TestUtils {
           } catch (ChannelException ex) {
             Throwable th = ex;
             String msg;
-            if(checkForCorruption) {
+            if (checkForCorruption) {
               msg = "Corrupt event found. Please run File Channel";
               th = ex.getCause();
             } else {
               msg = "Take list for FileBackedTransaction, capacity";
             }
-            Assert.assertTrue(th.getMessage().startsWith(
-                msg));
-            if(checkForCorruption) {
-              throw (Exception) th;
+            Assert.assertTrue(th.getMessage().startsWith(msg));
+            if (checkForCorruption) {
+              throw (Exception)th;
             }
             transaction.commit();
             return result;
@@ -204,16 +201,16 @@ public class TestUtils {
   public static Set<String> consumeChannel(Channel channel) throws Exception {
     return consumeChannel(channel, false);
   }
-  public static Set<String> consumeChannel(Channel channel,
-    boolean checkForCorruption) throws Exception {
+  public static Set<String> consumeChannel(Channel channel, boolean checkForCorruption)
+      throws Exception {
     Set<String> result = Sets.newHashSet();
     int[] batchSizes = new int[] {
         1000, 100, 10, 1
     };
     for (int i = 0; i < batchSizes.length; i++) {
-      while(true) {
+      while (true) {
         Set<String> batch = takeEvents(channel, batchSizes[i], checkForCorruption);
-        if(batch.isEmpty()) {
+        if (batch.isEmpty()) {
           break;
         }
         result.addAll(batch);
@@ -221,18 +218,16 @@ public class TestUtils {
     }
     return result;
   }
-  public static Set<String> fillChannel(Channel channel, String prefix)
-      throws Exception {
+  public static Set<String> fillChannel(Channel channel, String prefix) throws Exception {
     Set<String> result = Sets.newHashSet();
     int[] batchSizes = new int[] {
         1000, 100, 10, 1
     };
     for (int i = 0; i < batchSizes.length; i++) {
       try {
-        while(true) {
-          Set<String> batch = putEvents(channel, prefix, batchSizes[i],
-              Integer.MAX_VALUE, true);
-          if(batch.isEmpty()) {
+        while (true) {
+          Set<String> batch = putEvents(channel, prefix, batchSizes[i], Integer.MAX_VALUE, true);
+          if (batch.isEmpty()) {
             break;
           }
           result.addAll(batch);
@@ -243,19 +238,17 @@ public class TestUtils {
             + "size, a downstream system running slower than normal, or that "
             + "the channel capacity is just too low. [channel="
             + channel.getName() + "]").equals(e.getMessage())
-            || e.getMessage().startsWith("Put queue for FileBackedTransaction " +
-            "of capacity "));
+            || e.getMessage().startsWith("Put queue for FileBackedTransaction of capacity "));
       }
     }
     return result;
   }
-  public static Set<String> putEvents(Channel channel, String prefix,
-      int batchSize, int numEvents) throws Exception {
+  public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents)
+      throws Exception {
     return putEvents(channel, prefix, batchSize, numEvents, false);
   }
-  public static Set<String> putEvents(Channel channel, String prefix,
-          int batchSize, int numEvents, boolean untilCapacityIsReached)
-              throws Exception {
+  public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents,
+                                      boolean untilCapacityIsReached) throws Exception {
     Set<String> result = Sets.newHashSet();
     for (int i = 0; i < numEvents; i += batchSize) {
       Transaction transaction = channel.getTransaction();
@@ -272,13 +265,12 @@ public class TestUtils {
         result.addAll(batch);
       } catch (Exception ex) {
         transaction.rollback();
-        if(untilCapacityIsReached && ex instanceof ChannelException &&
+        if (untilCapacityIsReached && ex instanceof ChannelException &&
             ("The channel has reached it's capacity. "
                 + "This might be the result of a sink on the channel having too "
                 + "low of batch size, a downstream system running slower than "
                 + "normal, or that the channel capacity is just too low. "
-                + "[channel=" +channel.getName() + "]").
-              equals(ex.getMessage())) {
+                + "[channel=" + channel.getName() + "]").equals(ex.getMessage())) {
           break;
         }
         throw ex;
@@ -288,6 +280,7 @@ public class TestUtils {
     }
     return result;
   }
+
   public static void copyDecompressed(String resource, File output)
       throws IOException {
     URL input =  Resources.getResource(resource);
@@ -298,12 +291,11 @@ public class TestUtils {
     gzis.close();
   }
 
-  public static Context createFileChannelContext(String checkpointDir,
-      String dataDir, String backupDir, Map<String, String> overrides) {
+  public static Context createFileChannelContext(String checkpointDir, String dataDir,
+                                                 String backupDir, Map<String, String> overrides) {
     Context context = new Context();
-    context.put(FileChannelConfiguration.CHECKPOINT_DIR,
-            checkpointDir);
-    if(backupDir != null) {
+    context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir);
+    if (backupDir != null) {
       context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir);
     }
     context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
@@ -312,22 +304,22 @@ public class TestUtils {
     context.putAll(overrides);
     return context;
   }
-  public static FileChannel createFileChannel(String checkpointDir,
-    String dataDir, Map<String, String> overrides) {
+
+  public static FileChannel createFileChannel(String checkpointDir, String dataDir,
+                                              Map<String, String> overrides) {
     return createFileChannel(checkpointDir, dataDir, null, overrides);
   }
 
-  public static FileChannel createFileChannel(String checkpointDir,
-      String dataDir, String backupDir, Map<String, String> overrides) {
+  public static FileChannel createFileChannel(String checkpointDir, String dataDir,
+                                              String backupDir, Map<String, String> overrides) {
     FileChannel channel = new FileChannel();
     channel.setName("FileChannel-" + UUID.randomUUID());
-    Context context = createFileChannelContext(checkpointDir, dataDir,
-      backupDir, overrides);
+    Context context = createFileChannelContext(checkpointDir, dataDir, backupDir, overrides);
     Configurables.configure(channel, context);
     return channel;
   }
-  public static File writeStringToFile(File baseDir, String name,
-      String text) throws IOException {
+
+  public static File writeStringToFile(File baseDir, String name, String text) throws IOException {
     File passwordFile = new File(baseDir, name);
     Files.write(text, passwordFile, Charsets.UTF_8);
     return passwordFile;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
index 530ccf6..22848d2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java
@@ -32,24 +32,28 @@ public class CipherProviderTestSuite {
     this.encryptor = encryptor;
     this.decryptor = decryptor;
   }
+
   public void test() throws Exception {
     testBasic();
     testEmpty();
     testNullPlainText();
     testNullCipherText();
   }
+
   public void testBasic() throws Exception {
     String expected = "mn state fair is the place to be";
     byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8));
     byte[] clearText = decryptor.decrypt(cipherText);
     Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8));
   }
+
   public void testEmpty() throws Exception {
     String expected = "";
     byte[] cipherText = encryptor.encrypt(new byte[]{});
     byte[] clearText = decryptor.decrypt(cipherText);
     Assert.assertEquals(expected, new String(clearText));
   }
+
   public void testNullPlainText() throws Exception {
     try {
       encryptor.encrypt(null);
@@ -58,6 +62,7 @@ public class CipherProviderTestSuite {
       // expected
     }
   }
+
   public void testNullCipherText() throws Exception {
     try {
       decryptor.decrypt(null);


Mime
View raw message