flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [9/9] flume git commit: FLUME-2941. Integrate checkstyle for test classes
Date Fri, 08 Jul 2016 22:51:01 GMT
FLUME-2941. Integrate checkstyle for test classes

Also make test code conform to style guidelines.

Additionally, this patch makes style violations fatal to the build.

This patch is whitespace-only from a code perspective. After stripping
line numbers, the generated test bytecode before and after these changes
is identical.

Code review: https://reviews.apache.org/r/49830/

Reviewed by Hari.


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cfbf1156
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cfbf1156
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cfbf1156

Branch: refs/heads/trunk
Commit: cfbf1156858af9ae26975fefc94594d91c8cd3f4
Parents: c8c0f9b
Author: Mike Percy <mpercy@apache.org>
Authored: Wed Jun 29 21:18:20 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri Jul 8 15:48:26 2016 -0700

----------------------------------------------------------------------
 flume-checkstyle/pom.xml                        |   8 -
 .../resources/flume/checkstyle-suppressions.xml |  18 +-
 .../src/main/resources/flume/checkstyle.xml     |   2 +-
 .../flume/auth/TestFlumeAuthenticator.java      |  16 +-
 .../flume/channel/file/CountingSinkRunner.java  |  22 +-
 .../channel/file/CountingSourceRunner.java      |  21 +-
 .../flume/channel/file/TestCheckpoint.java      |   4 +-
 .../file/TestEventQueueBackingStoreFactory.java | 140 ++++---
 .../flume/channel/file/TestEventUtils.java      |   4 +-
 .../flume/channel/file/TestFileChannel.java     | 139 ++++---
 .../file/TestFileChannelFormatRegression.java   |  21 +-
 .../channel/file/TestFileChannelRestart.java    | 208 +++++-----
 .../channel/file/TestFileChannelRollback.java   |  16 +-
 .../flume/channel/file/TestFlumeEventQueue.java | 200 +++++-----
 .../flume/channel/file/TestIntegration.java     |  33 +-
 .../org/apache/flume/channel/file/TestLog.java  | 308 +++++++++------
 .../apache/flume/channel/file/TestLogFile.java  | 153 ++++----
 .../file/TestTransactionEventRecordV2.java      |  10 +-
 .../file/TestTransactionEventRecordV3.java      |  34 +-
 .../apache/flume/channel/file/TestUtils.java    | 122 +++---
 .../encryption/CipherProviderTestSuite.java     |   5 +
 .../file/encryption/EncryptionTestUtils.java    |  95 +++--
 .../encryption/TestAESCTRNoPaddingProvider.java |  10 +-
 .../encryption/TestFileChannelEncryption.java   |  98 +++--
 .../file/encryption/TestJCEFileKeyProvider.java |  69 ++--
 .../jdbc/BaseJdbcChannelProviderTest.java       |  28 +-
 .../apache/flume/channel/jdbc/MockEvent.java    |   3 +-
 .../flume/channel/jdbc/MockEventUtils.java      |  22 +-
 .../jdbc/TestDerbySchemaHandlerQueries.java     |   2 -
 .../flume/channel/kafka/TestKafkaChannel.java   |  67 ++--
 .../channel/TestSpillableMemoryChannel.java     | 386 +++++++++----------
 .../TestLoadBalancingLog4jAppender.java         |  51 ++-
 .../log4jappender/TestLog4jAppender.java        |  32 +-
 .../TestLog4jAppenderWithAvro.java              |   5 +-
 .../AbstractBasicChannelSemanticsTest.java      |   7 +-
 .../flume/channel/TestChannelProcessor.java     |  32 +-
 .../apache/flume/channel/TestMemoryChannel.java |  40 +-
 .../channel/TestMemoryChannelConcurrency.java   | 100 ++---
 .../channel/TestMemoryChannelTransaction.java   |  12 +-
 .../TestReliableSpoolingFileEventReader.java    | 201 +++++-----
 .../flume/formatter/output/TestBucketPath.java  |  52 +--
 .../TestMonitoredCounterGroup.java              |  50 +--
 .../http/TestHTTPMetricsServer.java             |  24 +-
 .../kafka/KafkaSourceCounterTest.java           |  78 ++--
 ...gexExtractorInterceptorMillisSerializer.java |  10 +-
 ...tractorInterceptorPassThroughSerializer.java |   3 +-
 .../TestSearchAndReplaceInterceptor.java        |   2 +-
 .../SyslogAvroEventSerializer.java              |  45 ++-
 .../TestAvroEventDeserializer.java              |   1 +
 .../TestDurablePositionTracker.java             |   3 +-
 .../TestFlumeEventAvroEventSerializer.java      |  24 +-
 .../TestResettableFileInputStream.java          |  26 +-
 .../TestSyslogAvroEventSerializer.java          |   2 +-
 .../org/apache/flume/sink/TestAvroSink.java     |  87 ++---
 .../flume/sink/TestDefaultSinkFactory.java      |   3 +-
 .../flume/sink/TestFailoverSinkProcessor.java   |  10 +-
 .../sink/TestLoadBalancingSinkProcessor.java    |  60 ++-
 .../apache/flume/sink/TestRollingFileSink.java  |  30 +-
 .../org/apache/flume/sink/TestThriftSink.java   |  11 +-
 .../source/TestAbstractPollableSource.java      |  24 +-
 .../org/apache/flume/source/TestAvroSource.java |  34 +-
 .../org/apache/flume/source/TestExecSource.java | 336 ++++++++--------
 .../source/TestMultiportSyslogTCPSource.java    |   1 +
 .../apache/flume/source/TestNetcatSource.java   |  79 ++--
 .../source/TestSequenceGeneratorSource.java     |   8 +-
 .../flume/source/TestSpoolDirectorySource.java  |  42 +-
 .../apache/flume/source/TestStressSource.java   |   2 +-
 .../apache/flume/source/TestSyslogParser.java   |  12 +-
 .../flume/source/TestSyslogTcpSource.java       |  12 +-
 .../flume/source/TestSyslogUdpSource.java       |  19 +-
 .../apache/flume/source/TestSyslogUtils.java    | 162 ++++----
 .../apache/flume/source/TestThriftSource.java   |  29 +-
 .../http/FlumeHttpServletRequestWrapper.java    |   4 +-
 .../flume/source/http/TestHTTPSource.java       | 153 ++++----
 .../flume/tools/TestTimestampRoundDownUtil.java |  10 +-
 .../org/apache/flume/tools/TestVersionInfo.java |   4 +-
 .../flume/agent/embedded/TestEmbeddedAgent.java |  24 +-
 .../TestEmbeddedAgentConfiguration.java         |  18 +-
 .../TestEmbeddedAgentEmbeddedSource.java        |  15 +-
 .../agent/embedded/TestEmbeddedAgentState.java  |  30 +-
 .../source/avroLegacy/TestLegacyAvroSource.java |  27 +-
 .../thriftLegacy/TestThriftLegacySource.java    |  30 +-
 .../node/TestAbstractConfigurationProvider.java |  71 ++--
 ...tAbstractZooKeeperConfigurationProvider.java |  25 +-
 .../org/apache/flume/node/TestApplication.java  |   2 +-
 ...lingPropertiesFileConfigurationProvider.java |   1 -
 ...TestPropertiesFileConfigurationProvider.java |  24 +-
 .../apache/flume/source/TestNetcatSource.java   |  38 +-
 .../java/org/apache/flume/api/RpcTestUtils.java |  77 ++--
 .../apache/flume/api/TestFailoverRpcClient.java |   4 +-
 .../flume/api/TestLoadBalancingRpcClient.java   | 107 ++---
 .../flume/api/TestNettyAvroRpcClient.java       |  26 +-
 .../apache/flume/api/TestThriftRpcClient.java   |  41 +-
 .../apache/flume/api/ThriftTestingSource.java   |  27 +-
 .../apache/flume/sink/kite/TestDatasetSink.java |  79 ++--
 .../flume/sink/hdfs/HDFSTestSeqWriter.java      |  16 +-
 .../apache/flume/sink/hdfs/MockDataStream.java  |   4 +-
 .../apache/flume/sink/hdfs/MockFileSystem.java  |  23 +-
 .../flume/sink/hdfs/MockFsDataOutputStream.java |  12 +-
 .../apache/flume/sink/hdfs/MockHDFSWriter.java  |   3 +-
 .../flume/sink/hdfs/TestBucketWriter.java       | 274 ++++++-------
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 127 +++---
 .../hdfs/TestSequenceFileSerializerFactory.java |   3 +-
 .../apache/flume/sink/hive/TestHiveSink.java    |  21 +-
 .../apache/flume/sink/hive/TestHiveWriter.java  |  50 ++-
 .../org/apache/flume/sink/hive/TestUtil.java    |  33 +-
 .../org/apache/flume/sink/irc/TestIRCSink.java  |  11 +-
 .../AbstractElasticSearchSinkTest.java          |  25 +-
 ...ElasticSearchIndexRequestBuilderFactory.java |  30 +-
 ...estElasticSearchLogStashEventSerializer.java |  82 ++--
 .../elasticsearch/TestElasticSearchSink.java    |  52 +--
 .../TestElasticSearchSinkCreation.java          |   2 +-
 .../client/RoundRobinListTest.java              |   3 +-
 .../client/TestElasticSearchClientFactory.java  |  10 +-
 .../client/TestElasticSearchRestClient.java     |  35 +-
 .../hbase/IncrementAsyncHBaseSerializer.java    |   6 +-
 .../flume/sink/hbase/TestAsyncHBaseSink.java    | 114 +++---
 .../apache/flume/sink/hbase/TestHBaseSink.java  | 129 ++++---
 .../hbase/TestRegexHbaseEventSerializer.java    |  37 +-
 .../apache/flume/sink/kafka/TestKafkaSink.java  |  89 ++---
 .../flume/sink/kafka/util/KafkaLocal.java       |  57 ++-
 .../flume/sink/kafka/util/ZooKeeperLocal.java   |  75 ++--
 .../solr/morphline/TestBlobDeserializer.java    |  10 +-
 .../morphline/TestMorphlineInterceptor.java     |  57 +--
 .../solr/morphline/TestMorphlineSolrSink.java   |  19 +-
 .../source/jms/JMSMessageConsumerTestBase.java  |  13 +-
 .../jms/TestDefaultJMSMessageConverter.java     |   2 +-
 .../source/jms/TestIntegrationActiveMQ.java     |  23 +-
 .../source/jms/TestJMSMessageConsumer.java      |   3 +-
 .../apache/flume/source/jms/TestJMSSource.java  |  53 ++-
 .../source/kafka/KafkaSourceEmbeddedKafka.java  |  11 +-
 .../kafka/KafkaSourceEmbeddedZookeeper.java     |   1 -
 .../flume/source/kafka/TestKafkaSource.java     | 152 ++++----
 .../source/taildir/TestTaildirEventReader.java  |  36 +-
 .../source/taildir/TestTaildirMatcher.java      |  57 ++-
 .../flume/source/taildir/TestTaildirSource.java |  38 +-
 .../flume/test/agent/TestFileChannel.java       | 178 ++++-----
 .../apache/flume/test/util/StagedInstall.java   |  39 +-
 .../org/apache/flume/test/util/SyslogAgent.java |   9 +-
 .../tools/TestFileChannelIntegrityTool.java     |  54 ++-
 pom.xml                                         |   6 +-
 141 files changed, 3523 insertions(+), 3423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/pom.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/pom.xml b/flume-checkstyle/pom.xml
index 31db3c0..74ebf6b 100644
--- a/flume-checkstyle/pom.xml
+++ b/flume-checkstyle/pom.xml
@@ -21,14 +21,6 @@
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
-  <!--
-  <parent>
-    <artifactId>flume-parent</artifactId>
-    <groupId>org.apache.flume</groupId>
-    <version>1.7.0-SNAPSHOT</version>
-  </parent>
-  -->
-
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-checkstyle</artifactId>
   <name>Flume checkstyle project</name>

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
index 49c8834..2642baa 100644
--- a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
+++ b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml
@@ -12,21 +12,29 @@
     <suppress checks="PackageName"
               files="org/apache/flume/source/avroLegacy|org/apache/flume/source/thriftLegacy"/>
 
+    <!-- Allow unicode escapes in tests -->
+    <suppress checks="AvoidEscapedUnicodeCharacters"
+              files="Test.*\.java"/>
+
     <!-- TODO: Rearrange methods in below classes to keep overloaded methods adjacent -->
     <suppress checks="OverloadMethodsDeclarationOrder"
-              files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java"/>
+              files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java|AbstractBasicChannelSemanticsTest\.java"/>
 
     <!-- TODO: Fix inner class names to follow standard convention -->
     <suppress checks="TypeName"
               files="SyslogUDPSource\.java|SyslogTcpSource\.java|TaildirSource\.java"/>
 
+    <!-- TODO: Method names must follow standard Java naming conventions -->
+    <suppress checks="MethodNameCheck"
+              files="TestBucketWriter\.java|TestSyslogUtils\.java"/>
+
     <!-- TODO: Add default cases to switch statements -->
     <suppress checks="MissingSwitchDefault"
-              files="SyslogUtils\.java|ReliableTaildirEventReader\.java"/>
+              files="SyslogUtils\.java|ReliableTaildirEventReader\.java|AbstractBasicChannelSemanticsTest\.java"/>
 
     <!-- TODO: Avoid empty catch blocks -->
     <suppress checks="EmptyCatchBlock"
-              files="channel/file/LogFile\.java"/>
+              files="channel/file/LogFile\.java|TestDatasetSink\.java|CountingSourceRunner\.java|CountingSinkRunner\.java|TestKafkaChannel\.java|TestTaildirSource\.java|TestChannelProcessor\.java|TestHiveSink\.java|AbstractBasicChannelSemanticsTest\.java|TestJMSSource\.java|TestEmbeddedAgent\.java|TestAsyncHBaseSink\.java"/>
 
     <!-- TODO: Avoid empty if blocks -->
     <suppress checks="EmptyBlockCheck"
@@ -34,10 +42,10 @@
 
     <!-- TODO: Fix line length issues -->
     <suppress checks="LineLengthCheck"
-              files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java"/>
+              files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java|TestAvroSink\.java"/>
 
     <!-- TODO: Move helper classes to their own files -->
     <suppress checks="OneTopLevelClass"
-              files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java"/>
+              files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java|TestElasticSearchSink\.java"/>
 
 </suppressions>

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/src/main/resources/flume/checkstyle.xml
----------------------------------------------------------------------
diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle.xml b/flume-checkstyle/src/main/resources/flume/checkstyle.xml
index e8913f0..fdbcb5d 100644
--- a/flume-checkstyle/src/main/resources/flume/checkstyle.xml
+++ b/flume-checkstyle/src/main/resources/flume/checkstyle.xml
@@ -18,7 +18,7 @@
 <module name = "Checker">
     <property name="charset" value="UTF-8"/>
 
-    <property name="severity" value="warning"/>
+    <property name="severity" value="error"/>
 
     <property name="fileExtensions" value="java, properties, xml"/>
     <!-- Checks for whitespace                               -->

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
index 5a8860d..0dc8872 100644
--- a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
+++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
@@ -17,15 +17,19 @@
  */
 package org.apache.flume.auth;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestFlumeAuthenticator {
 
@@ -132,7 +136,7 @@ public class TestFlumeAuthenticator {
     String principal = "flume";
     File keytab = new File(workDir, "flume2.keytab");
     kdc.createPrincipal(keytab, principal);
-    String expResult = principal+"@" + kdc.getRealm();
+    String expResult = principal + "@" + kdc.getRealm();
 
     // Clear the previous statically stored logged in credentials
     FlumeAuthenticationUtil.clearCredentials();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
index 0733dc4..a303994 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java
@@ -18,11 +18,10 @@
  */
 package org.apache.flume.channel.file;
 
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.flume.Sink;
 
-import com.google.common.collect.Lists;
+import java.util.List;
 
 public class CountingSinkRunner extends Thread {
   private int count;
@@ -30,39 +29,46 @@ public class CountingSinkRunner extends Thread {
   private final Sink sink;
   private volatile boolean run;
   private final List<Exception> errors = Lists.newArrayList();
+
   public CountingSinkRunner(Sink sink) {
     this(sink, Integer.MAX_VALUE);
   }
+
   public CountingSinkRunner(Sink sink, int until) {
     this.sink = sink;
     this.until = until;
   }
+
   @Override
   public void run() {
     run = true;
-    while(run && count < until) {
+    while (run && count < until) {
       boolean error = true;
       try {
-        if(Sink.Status.READY.equals(sink.process())) {
+        if (Sink.Status.READY.equals(sink.process())) {
           count++;
           error = false;
         }
-      } catch(Exception ex) {
+      } catch (Exception ex) {
         errors.add(ex);
       }
-      if(error) {
+      if (error) {
         try {
           Thread.sleep(1000L);
-        } catch (InterruptedException e) {}
+        } catch (InterruptedException e) {
+        }
       }
     }
   }
+
   public void shutdown() {
     run = false;
   }
+
   public int getCount() {
     return count;
   }
+
   public List<Exception> getErrors() {
     return errors;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
index b6abc35..1119990 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java
@@ -33,19 +33,23 @@ public class CountingSourceRunner extends Thread {
   private final PollableSource source;
   private volatile boolean run;
   private final List<Exception> errors = Lists.newArrayList();
+
   public CountingSourceRunner(PollableSource source) {
     this(source, Integer.MAX_VALUE);
   }
+
   public CountingSourceRunner(PollableSource source, int until) {
     this(source, until, null);
   }
+
   public CountingSourceRunner(PollableSource source, Channel channel) {
     this(source, Integer.MAX_VALUE, channel);
   }
+
   public CountingSourceRunner(PollableSource source, int until, Channel channel) {
     this.source = source;
     this.until = until;
-    if(channel != null) {
+    if (channel != null) {
       ReplicatingChannelSelector selector = new ReplicatingChannelSelector();
       List<Channel> channels = Lists.newArrayList();
       channels.add(channel);
@@ -53,32 +57,37 @@ public class CountingSourceRunner extends Thread {
       this.source.setChannelProcessor(new ChannelProcessor(selector));
     }
   }
+
   @Override
   public void run() {
     run = true;
-    while(run && count < until) {
+    while (run && count < until) {
       boolean error = true;
       try {
-        if(PollableSource.Status.READY.equals(source.process())) {
+        if (PollableSource.Status.READY.equals(source.process())) {
           count++;
           error = false;
         }
-      } catch(Exception ex) {
+      } catch (Exception ex) {
         errors.add(ex);
       }
-      if(error) {
+      if (error) {
         try {
           Thread.sleep(1000L);
-        } catch (InterruptedException e) {}
+        } catch (InterruptedException e) {
+        }
       }
     }
   }
+
   public void shutdown() {
     run = false;
   }
+
   public int getCount() {
     return count;
   }
+
   public List<Exception> getErrors() {
     return errors;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index c1de12e..cd1dcd9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -28,11 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestCheckpoint {
-
   File file;
   File inflightPuts;
   File inflightTakes;
   File queueSet;
+
   @Before
   public void setup() throws IOException {
     file = File.createTempFile("Checkpoint", "");
@@ -42,10 +42,12 @@ public class TestCheckpoint {
     Assert.assertTrue(file.isFile());
     Assert.assertTrue(file.canWrite());
   }
+
   @After
   public void cleanup() {
     file.delete();
   }
+
   @Test
   public void testSerialization() throws Exception {
     EventQueueBackingStore backingStore =

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index 52c706d..0939454 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -18,31 +18,29 @@
  */
 package org.apache.flume.channel.file;
 
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.protobuf.InvalidProtocolBufferException;
 import junit.framework.Assert;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
-import org.apache.flume.channel.file.proto.ProtosFactory;
 
 public class TestEventQueueBackingStoreFactory {
-  static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[] {
+  static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[]{
       8589936804L,
       4294969563L,
       12884904153L,
@@ -59,6 +57,7 @@ public class TestEventQueueBackingStoreFactory {
   File inflightTakes;
   File inflightPuts;
   File queueSetDir;
+
   @Before
   public void setup() throws IOException {
     baseDir = Files.createTempDir();
@@ -67,42 +66,46 @@ public class TestEventQueueBackingStoreFactory {
     inflightPuts = new File(baseDir, "puts");
     queueSetDir = new File(baseDir, "queueset");
     TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint);
-
   }
+
   @After
   public void teardown() {
     FileUtils.deleteQuietly(baseDir);
   }
+
   @Test
   public void testWithNoFlag() throws Exception {
     verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"),
-        Serialization.VERSION_3, pointersInTestCheckpoint);
+           Serialization.VERSION_3, pointersInTestCheckpoint);
   }
+
   @Test
   public void testWithFlag() throws Exception {
     verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true),
-        Serialization.VERSION_3, pointersInTestCheckpoint);
+           Serialization.VERSION_3, pointersInTestCheckpoint);
   }
+
   @Test
   public void testNoUprade() throws Exception {
     verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
-        Serialization.VERSION_2, pointersInTestCheckpoint);
+           Serialization.VERSION_2, pointersInTestCheckpoint);
   }
-  @Test (expected = BadCheckpointException.class)
+
+  @Test(expected = BadCheckpointException.class)
   public void testDecreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     backingStore.close();
     EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
     Assert.fail();
   }
 
-  @Test (expected = BadCheckpointException.class)
+  @Test(expected = BadCheckpointException.class)
   public void testIncreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     backingStore.close();
     EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
     Assert.fail();
@@ -112,22 +115,21 @@ public class TestEventQueueBackingStoreFactory {
   public void testNewCheckpoint() throws Exception {
     Assert.assertTrue(checkpoint.delete());
     verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
-        Serialization.VERSION_3, Collections.<Long>emptyList());
+           Serialization.VERSION_3, Collections.<Long>emptyList());
   }
 
-  @Test (expected = BadCheckpointException.class)
+  @Test(expected = BadCheckpointException.class)
   public void testCheckpointBadVersion() throws Exception {
-     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
+    RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
-    backingStore.close();
-    writer.seek(
-            EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
-    writer.writeLong(94L);
-    writer.getFD().sync();
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore.close();
+      writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
+      writer.writeLong(94L);
+      writer.getFD().sync();
 
-    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     } finally {
       writer.close();
     }
@@ -138,15 +140,13 @@ public class TestEventQueueBackingStoreFactory {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
 
     try {
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
-    backingStore.close();
-    writer.seek(
-            EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER *
-            Serialization.SIZE_OF_LONG);
-    writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
-    writer.getFD().sync();
-    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore.close();
+      writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * Serialization.SIZE_OF_LONG);
+      writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
+      writer.getFD().sync();
+      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     } finally {
       writer.close();
     }
@@ -156,12 +156,10 @@ public class TestEventQueueBackingStoreFactory {
   public void testCheckpointVersionNotEqualToMeta() throws Exception {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
-      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-              get(checkpoint, 10, "test");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
       backingStore.close();
-      writer.seek(
-              EventQueueBackingStoreFile.INDEX_VERSION
-              * Serialization.SIZE_OF_LONG);
+      writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
       writer.writeLong(2L);
       writer.getFD().sync();
       backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
@@ -174,8 +172,8 @@ public class TestEventQueueBackingStoreFactory {
   public void testCheckpointVersionNotEqualToMeta2() throws Exception {
     FileOutputStream os = null;
     try {
-      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-              get(checkpoint, 10, "test");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
       backingStore.close();
       Assert.assertTrue(checkpoint.exists());
       Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -183,8 +181,7 @@ public class TestEventQueueBackingStoreFactory {
       ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is);
       Assert.assertNotNull(meta);
       is.close();
-      os = new FileOutputStream(
-              Serialization.getMetaDataFile(checkpoint));
+      os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint));
       meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
       os.flush();
       backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
@@ -197,12 +194,10 @@ public class TestEventQueueBackingStoreFactory {
   public void testCheckpointOrderIdNotEqualToMeta() throws Exception {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
-      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-              get(checkpoint, 10, "test");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
       backingStore.close();
-      writer.seek(
-              EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID
-              * Serialization.SIZE_OF_LONG);
+      writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * Serialization.SIZE_OF_LONG);
       writer.writeLong(2L);
       writer.getFD().sync();
       backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
@@ -215,8 +210,8 @@ public class TestEventQueueBackingStoreFactory {
   public void testCheckpointOrderIdNotEqualToMeta2() throws Exception {
     FileOutputStream os = null;
     try {
-      EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-              get(checkpoint, 10, "test");
+      EventQueueBackingStore backingStore =
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
       backingStore.close();
       Assert.assertTrue(checkpoint.exists());
       Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -225,7 +220,7 @@ public class TestEventQueueBackingStoreFactory {
       Assert.assertNotNull(meta);
       is.close();
       os = new FileOutputStream(
-              Serialization.getMetaDataFile(checkpoint));
+          Serialization.getMetaDataFile(checkpoint));
       meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os);
       os.flush();
       backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
@@ -234,11 +229,10 @@ public class TestEventQueueBackingStoreFactory {
     }
   }
 
-
   @Test(expected = BadCheckpointException.class)
   public void testTruncateMeta() throws Exception {
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     backingStore.close();
     Assert.assertTrue(checkpoint.exists());
     File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -250,10 +244,10 @@ public class TestEventQueueBackingStoreFactory {
     backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
   }
 
-  @Test (expected = InvalidProtocolBufferException.class)
+  @Test(expected = InvalidProtocolBufferException.class)
   public void testCorruptMeta() throws Throwable {
-    EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.
-            get(checkpoint, 10, "test");
+    EventQueueBackingStore backingStore =
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
     backingStore.close();
     Assert.assertTrue(checkpoint.exists());
     File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -270,17 +264,13 @@ public class TestEventQueueBackingStoreFactory {
     }
   }
 
-
-
-
   private void verify(EventQueueBackingStore backingStore, long expectedVersion,
-      List<Long> expectedPointers)
-      throws Exception {
-    FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes,
-        inflightPuts, queueSetDir);
+                      List<Long> expectedPointers) throws Exception {
+    FlumeEventQueue queue =
+        new FlumeEventQueue(backingStore, inflightTakes, inflightPuts, queueSetDir);
     List<Long> actualPointers = Lists.newArrayList();
     FlumeEventPointer ptr;
-    while((ptr = queue.removeHead(0L)) != null) {
+    while ((ptr = queue.removeHead(0L)) != null) {
       actualPointers.add(ptr.toLong());
     }
     Assert.assertEquals(expectedPointers, actualPointers);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
index c72e3f2..26f9cae 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
@@ -28,7 +28,7 @@ public class TestEventUtils {
   @Test
   public void testPutEvent() {
     FlumeEvent event = new FlumeEvent(null, new byte[5]);
-    Put put = new Put(1l, 1l, event);
+    Put put = new Put(1L, 1L, event);
     Event returnEvent = EventUtils.getEventFromTransactionEvent(put);
     Assert.assertNotNull(returnEvent);
     Assert.assertEquals(5, returnEvent.getBody().length);
@@ -36,7 +36,7 @@ public class TestEventUtils {
 
   @Test
   public void testInvalidEvent() {
-    Take take = new Take(1l, 1l);
+    Take take = new Take(1L, 1L);
     Event returnEvent = EventUtils.getEventFromTransactionEvent(take);
     Assert.assertNull(returnEvent);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index bb22e26..bfc2d0d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -18,8 +18,22 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.apache.flume.channel.file.TestUtils.*;
-import static org.fest.reflect.core.Reflection.*;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
+import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -32,7 +46,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -41,23 +54,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flume.ChannelException;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
-import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper;
-import org.apache.flume.event.EventBuilder;
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.putWithoutCommit;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
+import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit;
+import static org.fest.reflect.core.Reflection.field;
 
 public class TestFileChannel extends TestFileChannelBase {
 
@@ -68,6 +73,7 @@ public class TestFileChannel extends TestFileChannelBase {
   public void setup() throws Exception {
     super.setup();
   }
+
   @After
   public void teardown() {
     super.teardown();
@@ -146,23 +152,22 @@ public class TestFileChannel extends TestFileChannelBase {
     //Simulate multiple sources, so separate thread - txns are thread local,
     //so a new txn wont be created here unless it is in a different thread.
     final CountDownLatch latch = new CountDownLatch(1);
-    Executors.newSingleThreadExecutor().submit(
-            new Runnable() {
-              @Override
-              public void run() {
-                Transaction tx = channel.getTransaction();
-                input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3));
-                try {
-                  latch.await();
-                  tx.commit();
-                } catch (InterruptedException e) {
-                  tx.rollback();
-                  Throwables.propagate(e);
-                } finally {
-                  tx.close();
-                }
-              }
-            });
+    Executors.newSingleThreadExecutor().submit(new Runnable() {
+      @Override
+      public void run() {
+        Transaction tx = channel.getTransaction();
+        input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3));
+        try {
+          latch.await();
+          tx.commit();
+        } catch (InterruptedException e) {
+          tx.rollback();
+          Throwables.propagate(e);
+        } finally {
+          tx.close();
+        }
+      }
+    });
     forceCheckpoint(channel);
     tx.commit();
     tx.close();
@@ -198,7 +203,7 @@ public class TestFileChannel extends TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     Set<String> in = Sets.newHashSet();
     try {
-      while(true) {
+      while (true) {
         in.addAll(putEvents(channel, "reconfig", 1, 1));
       }
     } catch (ChannelException e) {
@@ -206,12 +211,13 @@ public class TestFileChannel extends TestFileChannelBase {
           + "This might be the result of a sink on the channel having too "
           + "low of batch size, a downstream system running slower than "
           + "normal, or that the channel capacity is just too low. [channel="
-          + channel.getName()+"]", e.getMessage());
+          + channel.getName() + "]", e.getMessage());
     }
     Configurables.configure(channel, createContext());
     Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
     compareInputAndOut(in, out);
   }
+
   @Test
   public void testPut() throws Exception {
     channel.start();
@@ -225,6 +231,7 @@ public class TestFileChannel extends TestFileChannelBase {
     Set<String> actual = takeEvents(channel, 1);
     compareInputAndOut(expected, actual);
   }
+
   @Test
   public void testCommitAfterNoPutTake() throws Exception {
     channel.start();
@@ -246,6 +253,7 @@ public class TestFileChannel extends TestFileChannelBase {
     transaction.commit();
     transaction.close();
   }
+
   @Test
   public void testCapacity() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
@@ -270,6 +278,7 @@ public class TestFileChannel extends TestFileChannelBase {
     // ensure we the events back
     Assert.assertEquals(5, takeEvents(channel, 1, 5).size());
   }
+
   /**
    * This test is here to make sure we can replay a full queue
    * when we have a PUT with a lower txid than the take which
@@ -287,16 +296,14 @@ public class TestFileChannel extends TestFileChannelBase {
     // the idea here is we will fill up the channel
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(10L));
-    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));
-    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
-        String.valueOf(10));
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10L));
+    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, String.valueOf(10L));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
     fillChannel(channel, "fillup");
     // then do a put which will block but it will be assigned a tx id
-    Future<String> put = Executors.newSingleThreadExecutor()
-            .submit(new Callable<String>() {
+    Future<String> put = Executors.newSingleThreadExecutor().submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
         Set<String> result = putEvents(channel, "blocked-put", 1, 1);
@@ -321,6 +328,7 @@ public class TestFileChannel extends TestFileChannelBase {
     channel.start();
     Assert.assertTrue(channel.isOpen());
   }
+
   @Test
   public void testThreaded() throws IOException, InterruptedException {
     channel.start();
@@ -328,12 +336,9 @@ public class TestFileChannel extends TestFileChannelBase {
     int numThreads = 10;
     final CountDownLatch producerStopLatch = new CountDownLatch(numThreads);
     final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads);
-    final List<Exception> errors = Collections
-            .synchronizedList(new ArrayList<Exception>());
-    final Set<String> expected = Collections.synchronizedSet(
-            new HashSet<String>());
-    final Set<String> actual = Collections.synchronizedSet(
-            new HashSet<String>());
+    final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>());
+    final Set<String> expected = Collections.synchronizedSet(new HashSet<String>());
+    final Set<String> actual = Collections.synchronizedSet(new HashSet<String>());
     for (int i = 0; i < numThreads; i++) {
       final int id = i;
       Thread t = new Thread() {
@@ -363,15 +368,15 @@ public class TestFileChannel extends TestFileChannelBase {
         @Override
         public void run() {
           try {
-            while(!producerStopLatch.await(1, TimeUnit.SECONDS) ||
-                expected.size() > actual.size()) {
+            while (!producerStopLatch.await(1, TimeUnit.SECONDS) ||
+                   expected.size() > actual.size()) {
               if (id % 2 == 0) {
                 actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
               } else {
                 actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
               }
             }
-            if(actual.isEmpty()) {
+            if (actual.isEmpty()) {
               LOG.error("Found nothing!");
             } else {
               LOG.info("Completed some takes " + actual.size());
@@ -388,12 +393,13 @@ public class TestFileChannel extends TestFileChannelBase {
       t.start();
     }
     Assert.assertTrue("Timed out waiting for producers",
-            producerStopLatch.await(30, TimeUnit.SECONDS));
+                      producerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertTrue("Timed out waiting for consumer",
-            consumerStopLatch.await(30, TimeUnit.SECONDS));
+                      consumerStopLatch.await(30, TimeUnit.SECONDS));
     Assert.assertEquals(Collections.EMPTY_LIST, errors);
     compareInputAndOut(expected, actual);
   }
+
   @Test
   public void testLocking() throws IOException {
     channel.start();
@@ -403,7 +409,6 @@ public class TestFileChannel extends TestFileChannelBase {
     Assert.assertTrue(!fileChannel.isOpen());
   }
 
-
   /**
    * Test contributed by Brock Noland during code review.
    * @throws Exception
@@ -437,11 +442,11 @@ public class TestFileChannel extends TestFileChannelBase {
   }
 
   @Test
-  public void testPutForceCheckpointCommitReplay() throws Exception{
+  public void testPutForceCheckpointCommitReplay() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
     overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
-        String.valueOf(2));
+                  String.valueOf(2));
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
     FileChannel channel = createFileChannel(overrides);
     channel.start();
@@ -578,28 +583,22 @@ public class TestFileChannel extends TestFileChannelBase {
     testChannelDiesOnCorruptEvent(true);
   }
 
-
   @Test
-  public void testChannelDiesOnCorruptEventNoFsync() throws
-    Exception {
+  public void testChannelDiesOnCorruptEventNoFsync() throws Exception {
     testChannelDiesOnCorruptEvent(false);
   }
 
-
-
-  private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn)
-    throws Exception {
+  private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn) throws Exception {
     Map<String, String> overrides = new HashMap<String, String>();
-    overrides.put(FileChannelConfiguration.FSYNC_PER_TXN,
-      String.valueOf(fsyncPerTxn));
+    overrides.put(FileChannelConfiguration.FSYNC_PER_TXN, String.valueOf(fsyncPerTxn));
     final FileChannel channel = createFileChannel(overrides);
     channel.start();
     putEvents(channel,"test-corrupt-event",100,100);
-    for(File dataDir : dataDirs) {
+    for (File dataDir : dataDirs) {
       File[] files = dataDir.listFiles(new FilenameFilter() {
         @Override
         public boolean accept(File dir, String name) {
-          if(!name.endsWith("meta") && !name.contains("lock")){
+          if (!name.endsWith("meta") && !name.contains("lock")) {
             return true;
           }
           return false;
@@ -624,7 +623,7 @@ public class TestFileChannel extends TestFileChannelBase {
       Assert.assertTrue(ex.getMessage().contains("Log is closed"));
       throw ex;
     }
-    if(fsyncPerTxn) {
+    if (fsyncPerTxn) {
       Assert.fail();
     } else {
       // The corrupt event must be missing, the rest should be

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
index c95122b..f0638f9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
@@ -18,14 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,8 +26,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
 
 public class TestFileChannelFormatRegression extends TestFileChannelBase {
   protected static final Logger LOG = LoggerFactory
@@ -60,8 +59,8 @@ public class TestFileChannelFormatRegression extends TestFileChannelBase {
             new File(checkpointDir, "checkpoint"));
     for (int i = 0; i < dataDirs.length; i++) {
       int fileIndex = i + 1;
-      TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz",
-              new File(dataDirs[i], "log-" + fileIndex));
+      TestUtils.copyDecompressed("fileformat-v2-log-" + fileIndex + ".gz",
+                                 new File(dataDirs[i], "log-" + fileIndex));
     }
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10));

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index d5fe6fb..d21f140 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -55,8 +55,7 @@ import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit;
 import static org.fest.reflect.core.Reflection.*;
 
 public class TestFileChannelRestart extends TestFileChannelBase {
-  protected static final Logger LOG = LoggerFactory
-      .getLogger(TestFileChannelRestart.class);
+  protected static final Logger LOG = LoggerFactory.getLogger(TestFileChannelRestart.class);
 
   @Before
   public void setup() throws Exception {
@@ -72,8 +71,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   protected FileChannel createFileChannel(Map<String, String> overrides) {
     // FLUME-2482, making sure scheduled checkpoint never gets called
     overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "6000000");
-    return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
-            dataDir, backupDir.getAbsolutePath(), overrides);
+    return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), dataDir,
+                                       backupDir.getAbsolutePath(), overrides);
   }
 
   @Test
@@ -116,14 +115,14 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   public void doTestRestart(boolean useLogReplayV1,
-          boolean forceCheckpoint, boolean deleteCheckpoint,
-          boolean useFastReplay) throws Exception {
+                            boolean forceCheckpoint, boolean deleteCheckpoint,
+                            boolean useFastReplay) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
-            String.valueOf(useLogReplayV1));
+                  String.valueOf(useLogReplayV1));
     overrides.put(
-            FileChannelConfiguration.USE_FAST_REPLAY,
-            String.valueOf(useFastReplay));
+        FileChannelConfiguration.USE_FAST_REPLAY,
+        String.valueOf(useFastReplay));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -132,7 +131,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
       forceCheckpoint(channel);
     }
     channel.stop();
-    if(deleteCheckpoint) {
+    if (deleteCheckpoint) {
       File checkpoint = new File(checkpointDir, "checkpoint");
       Assert.assertTrue(checkpoint.delete());
       File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -146,19 +145,17 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws
-      Exception {
+  public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws Exception {
     doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false);
   }
 
   @Test
-  public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup()
-      throws Exception {
+  public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup() throws Exception {
     doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true);
   }
 
-  private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(
-      boolean backup) throws Exception {
+  private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(boolean backup)
+      throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -167,7 +164,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
@@ -186,19 +183,16 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{
+  public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception {
     doTestRestartWhenCheckpointExistsButMetaDoesNot(false);
   }
 
   @Test
-  public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws
-      Exception{
+  public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws Exception {
     doTestRestartWhenCheckpointExistsButMetaDoesNot(true);
   }
 
-
-  private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup)
-      throws Exception {
+  private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -207,7 +201,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
@@ -235,8 +229,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     doTestRestartWhenNoCheckpointExists(true);
   }
 
-  private void doTestRestartWhenNoCheckpointExists(boolean backup) throws
-      Exception {
+  private void doTestRestartWhenNoCheckpointExists(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -245,7 +238,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
@@ -273,7 +266,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     doTestBadCheckpointVersion(true);
   }
 
-  private void doTestBadCheckpointVersion(boolean backup) throws Exception{
+  private void doTestBadCheckpointVersion(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -282,14 +275,14 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     writer.seek(EventQueueBackingStoreFile.INDEX_VERSION *
-            Serialization.SIZE_OF_LONG);
+                Serialization.SIZE_OF_LONG);
     writer.writeLong(2L);
     writer.getFD().sync();
     writer.close();
@@ -311,8 +304,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     doTestBadCheckpointMetaVersion(true);
   }
 
-  private void doTestBadCheckpointMetaVersion(boolean backup) throws
-      Exception {
+  private void doTestBadCheckpointMetaVersion(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -321,7 +313,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
@@ -331,7 +323,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Assert.assertNotNull(meta);
     is.close();
     FileOutputStream os = new FileOutputStream(
-            Serialization.getMetaDataFile(checkpoint));
+                                                  Serialization.getMetaDataFile(checkpoint));
     meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
     os.flush();
     channel = createFileChannel(overrides);
@@ -348,13 +340,11 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws
-      Exception {
+  public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws Exception {
     doTestDifferingOrderIDCheckpointAndMetaVersion(true);
   }
 
-  private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup)
-      throws Exception {
+  private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
@@ -363,7 +353,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
@@ -373,7 +363,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Assert.assertNotNull(meta);
     is.close();
     FileOutputStream os = new FileOutputStream(
-            Serialization.getMetaDataFile(checkpoint));
+                                                  Serialization.getMetaDataFile(checkpoint));
     meta.toBuilder().setWriteOrderID(12).build().writeDelimitedTo(os);
     os.flush();
     channel = createFileChannel(overrides);
@@ -385,12 +375,12 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testIncompleteCheckpoint() throws Exception{
+  public void testIncompleteCheckpoint() throws Exception {
     doTestIncompleteCheckpoint(false);
   }
 
   @Test
-  public void testIncompleteCheckpointWithCheckpoint() throws Exception{
+  public void testIncompleteCheckpointWithCheckpoint() throws Exception {
     doTestIncompleteCheckpoint(true);
   }
 
@@ -403,14 +393,14 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER
-      * Serialization.SIZE_OF_LONG);
+                * Serialization.SIZE_OF_LONG);
     writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
     writer.getFD().sync();
     writer.close();
@@ -443,30 +433,30 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testFastReplayWithCheckpoint() throws Exception{
+  public void testFastReplayWithCheckpoint() throws Exception {
     testFastReplay(false, true);
   }
 
   @Test
-  public void testFastReplayWithBadCheckpoint() throws Exception{
+  public void testFastReplayWithBadCheckpoint() throws Exception {
     testFastReplay(true, true);
   }
 
   @Test
-  public void testNoFastReplayWithCheckpoint() throws Exception{
+  public void testNoFastReplayWithCheckpoint() throws Exception {
     testFastReplay(false, false);
   }
 
   @Test
-  public void testNoFastReplayWithBadCheckpoint() throws Exception{
+  public void testNoFastReplayWithBadCheckpoint() throws Exception {
     testFastReplay(true, false);
   }
 
-  private void testFastReplay(boolean shouldCorruptCheckpoint,
-                             boolean useFastReplay) throws Exception{
+  private void testFastReplay(boolean shouldCorruptCheckpoint, boolean useFastReplay)
+      throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_FAST_REPLAY,
-      String.valueOf(useFastReplay));
+                  String.valueOf(useFastReplay));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -477,7 +467,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     if (shouldCorruptCheckpoint) {
       File checkpoint = new File(checkpointDir, "checkpoint");
       RandomAccessFile writer = new RandomAccessFile(
-        Serialization.getMetaDataFile(checkpoint), "rw");
+                                                        Serialization.getMetaDataFile(checkpoint),
+                                                        "rw");
       writer.seek(10);
       writer.writeLong(new Random().nextLong());
       writer.getFD().sync();
@@ -495,14 +486,13 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     compareInputAndOut(in, out);
   }
 
-  private void doTestCorruptInflights(String name,
-    boolean backup) throws Exception {
+  private void doTestCorruptInflights(String name, boolean backup) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
-    final Set<String> in1 = putEvents(channel, "restart-",10, 100);
+    final Set<String> in1 = putEvents(channel, "restart-", 10, 100);
     Assert.assertEquals(100, in1.size());
     Executors.newSingleThreadScheduledExecutor().submit(new Runnable() {
       @Override
@@ -516,7 +506,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in2 = putWithoutCommit(channel, tx, "restart", 100);
     Assert.assertEquals(100, in2.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     tx.commit();
@@ -554,13 +544,12 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
-    RandomAccessFile writer = new RandomAccessFile(
-            Serialization.getMetaDataFile(checkpoint), "rw");
+    RandomAccessFile writer = new RandomAccessFile(Serialization.getMetaDataFile(checkpoint), "rw");
     writer.setLength(0);
     writer.getFD().sync();
     writer.close();
@@ -591,13 +580,12 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> in = putEvents(channel, "restart", 10, 100);
     Assert.assertEquals(100, in.size());
     forceCheckpoint(channel);
-    if(backup) {
+    if (backup) {
       Thread.sleep(2000);
     }
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
-    RandomAccessFile writer = new RandomAccessFile(
-            Serialization.getMetaDataFile(checkpoint), "rw");
+    RandomAccessFile writer = new RandomAccessFile(Serialization.getMetaDataFile(checkpoint), "rw");
     writer.seek(10);
     writer.writeLong(new Random().nextLong());
     writer.getFD().sync();
@@ -618,11 +606,10 @@ public class TestFileChannelRestart extends TestFileChannelBase {
       Assert.assertFalse(backupRestored);
     }
   }
- 
+
   //This test will fail without FLUME-1893
   @Test
-  public void testCorruptCheckpointVersionMostSignificant4Bytes()
-    throws Exception {
+  public void testCorruptCheckpointVersionMostSignificant4Bytes() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     channel = createFileChannel(overrides);
     channel.start();
@@ -634,8 +621,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     writer.seek(EventQueueBackingStoreFile.INDEX_VERSION *
-      Serialization.SIZE_OF_LONG);
-    writer.write(new byte[]{(byte)1, (byte)5});
+                Serialization.SIZE_OF_LONG);
+    writer.write(new byte[] { (byte) 1, (byte) 5 });
     writer.getFD().sync();
     writer.close();
     channel = createFileChannel(overrides);
@@ -648,8 +635,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
 
   //This test will fail without FLUME-1893
   @Test
-  public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes()
-    throws Exception {
+  public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     channel = createFileChannel(overrides);
     channel.start();
@@ -661,8 +647,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     File checkpoint = new File(checkpointDir, "checkpoint");
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER *
-      Serialization.SIZE_OF_LONG);
-    writer.write(new byte[]{(byte) 1, (byte) 5});
+                Serialization.SIZE_OF_LONG);
+    writer.write(new byte[] { (byte) 1, (byte) 5 });
     writer.getFD().sync();
     writer.close();
     channel = createFileChannel(overrides);
@@ -674,8 +660,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   }
 
   @Test
-  public void testWithExtraLogs()
-      throws Exception {
+  public void testWithExtraLogs() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.CAPACITY, "10");
     overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10");
@@ -702,27 +687,24 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   // Make sure the entire channel was not replayed, only the events from the
   // backup.
   @Test
-  public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws
-    Exception {
+  public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws Exception {
     testBackupUsedEnsureNoFullReplay(false);
   }
+
   @Test
-  public void testBackupUsedEnsureNoFullReplayWithCompression() throws
-    Exception {
+  public void testBackupUsedEnsureNoFullReplayWithCompression() throws Exception {
     testBackupUsedEnsureNoFullReplay(true);
   }
 
   private void testBackupUsedEnsureNoFullReplay(boolean compressedBackup)
-    throws Exception {
+      throws Exception {
     File dataDir = Files.createTempDir();
     File tempBackup = Files.createTempDir();
     Map<String, String> overrides = Maps.newHashMap();
-    overrides.put(FileChannelConfiguration.DATA_DIRS,
-      dataDir.getAbsolutePath());
-    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
-      "true");
+    overrides.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath());
+    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
     overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
-      String.valueOf(compressedBackup));
+                  String.valueOf(compressedBackup));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -734,8 +716,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     in = putEvents(channel, "restart", 10, 100);
     takeEvents(channel, 10, 100);
     Assert.assertEquals(100, in.size());
-    for(File file : backupDir.listFiles()) {
-      if(file.getName().equals(Log.FILE_LOCK)) {
+    for (File file : backupDir.listFiles()) {
+      if (file.getName().equals(Log.FILE_LOCK)) {
         continue;
       }
       Files.copy(file, new File(tempBackup, file.getName()));
@@ -749,8 +731,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     // tests), so throw away the backup and force the use of an older backup by
     // bringing in the copy of the last backup before the checkpoint.
     Serialization.deleteAllFiles(backupDir, Log.EXCLUDES);
-    for(File file : tempBackup.listFiles()) {
-      if(file.getName().equals(Log.FILE_LOCK)) {
+    for (File file : tempBackup.listFiles()) {
+      if (file.getName().equals(Log.FILE_LOCK)) {
         continue;
       }
       Files.copy(file, new File(backupDir, file.getName()));
@@ -782,7 +764,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Assert.assertTrue(channel.isOpen());
     putEvents(channel, prefix, 10, 100);
     Set<String> origFiles = Sets.newHashSet();
-    for(File dir : dataDirs) {
+    for (File dir : dataDirs) {
       origFiles.addAll(Lists.newArrayList(dir.list()));
     }
     forceCheckpoint(channel);
@@ -792,7 +774,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     Set<String> newFiles = Sets.newHashSet();
     int olderThanCheckpoint = 0;
     int totalMetaFiles = 0;
-    for(File dir : dataDirs) {
+    for (File dir : dataDirs) {
       File[] metadataFiles = dir.listFiles(new FilenameFilter() {
         @Override
         public boolean accept(File dir, String name) {
@@ -803,8 +785,8 @@ public class TestFileChannelRestart extends TestFileChannelBase {
         }
       });
       totalMetaFiles = metadataFiles.length;
-      for(File metadataFile : metadataFiles) {
-        if(metadataFile.lastModified() < beforeSecondCheckpoint) {
+      for (File metadataFile : metadataFiles) {
+        if (metadataFile.lastModified() < beforeSecondCheckpoint) {
           olderThanCheckpoint++;
         }
       }
@@ -824,13 +806,13 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     takeEvents(channel, 10, 50);
     forceCheckpoint(channel);
     newFiles = Sets.newHashSet();
-    for(File dir : dataDirs) {
+    for (File dir : dataDirs) {
       newFiles.addAll(Lists.newArrayList(dir.list()));
     }
     Assert.assertTrue(!newFiles.containsAll(origFiles));
   }
 
-  @Test (expected = IOException.class)
+  @Test(expected = IOException.class)
   public void testSlowBackup() throws Throwable {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
@@ -858,10 +840,10 @@ public class TestFileChannelRestart extends TestFileChannelBase {
   public void testCompressBackup() throws Throwable {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
-      "true");
+                  "true");
     overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
     overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
-      "true");
+                  "true");
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -873,36 +855,34 @@ public class TestFileChannelRestart extends TestFileChannelBase {
 
     Assert.assertTrue(compressedBackupCheckpoint.exists());
 
-    Serialization.decompressFile(compressedBackupCheckpoint,
-      uncompressedBackupCheckpoint);
+    Serialization.decompressFile(compressedBackupCheckpoint, uncompressedBackupCheckpoint);
 
     File checkpoint = new File(checkpointDir, "checkpoint");
-    Assert.assertTrue(FileUtils.contentEquals(checkpoint,
-      uncompressedBackupCheckpoint));
+    Assert.assertTrue(FileUtils.contentEquals(checkpoint, uncompressedBackupCheckpoint));
 
     channel.stop();
   }
 
   @Test
   public void testToggleCheckpointCompressionFromTrueToFalse()
-    throws Exception {
+      throws Exception {
     restartToggleCompression(true);
   }
 
   @Test
   public void testToggleCheckpointCompressionFromFalseToTrue()
-    throws Exception {
+      throws Exception {
     restartToggleCompression(false);
   }
 
   public void restartToggleCompression(boolean originalCheckpointCompressed)
-    throws Exception {
+      throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
-      "true");
+                  "true");
     overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
     overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
-      String.valueOf(originalCheckpointCompressed));
+                  String.valueOf(originalCheckpointCompressed));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -910,17 +890,17 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     forceCheckpoint(channel);
     Thread.sleep(2000);
     Assert.assertEquals(compressedBackupCheckpoint.exists(),
-      originalCheckpointCompressed);
+                        originalCheckpointCompressed);
     Assert.assertEquals(uncompressedBackupCheckpoint.exists(),
-      !originalCheckpointCompressed);
+                        !originalCheckpointCompressed);
     channel.stop();
     File checkpoint = new File(checkpointDir, "checkpoint");
     Assert.assertTrue(checkpoint.delete());
     File checkpointMetaData = Serialization.getMetaDataFile(
-      checkpoint);
+        checkpoint);
     Assert.assertTrue(checkpointMetaData.delete());
     overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT,
-      String.valueOf(!originalCheckpointCompressed));
+                  String.valueOf(!originalCheckpointCompressed));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -929,21 +909,21 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     forceCheckpoint(channel);
     Thread.sleep(2000);
     Assert.assertEquals(compressedBackupCheckpoint.exists(),
-      !originalCheckpointCompressed);
+                        !originalCheckpointCompressed);
     Assert.assertEquals(uncompressedBackupCheckpoint.exists(),
-      originalCheckpointCompressed);
+                        originalCheckpointCompressed);
   }
 
   private static void slowdownBackup(FileChannel channel) {
     Log log = field("log").ofType(Log.class).in(channel).get();
 
     FlumeEventQueue queue = field("queue")
-      .ofType(FlumeEventQueue.class)
-      .in(log).get();
+                                .ofType(FlumeEventQueue.class)
+                                .in(log).get();
 
     EventQueueBackingStore backingStore = field("backingStore")
-      .ofType(EventQueueBackingStore.class)
-      .in(queue).get();
+                                              .ofType(EventQueueBackingStore.class)
+                                              .in(queue).get();
 
     field("slowdownBackup").ofType(Boolean.class).in(backingStore).set(true);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
index 23fc64b..c06d498 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java
@@ -18,11 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.util.Collections;
-import java.util.Set;
-
+import com.google.common.base.Charsets;
 import org.apache.flume.Transaction;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.LoggerSink;
@@ -33,8 +29,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+import java.util.Collections;
+import java.util.Set;
 
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
 
 public class TestFileChannelRollback extends TestFileChannelBase {
   protected static final Logger LOG = LoggerFactory
@@ -117,11 +117,11 @@ public class TestFileChannelRollback extends TestFileChannelBase {
     transaction.rollback();
     transaction.close();
 
-    while(runner.isAlive()) {
+    while (runner.isAlive()) {
       Thread.sleep(10L);
     }
     Assert.assertEquals(numEvents - 1, runner.getCount());
-    for(Exception ex : runner.getErrors()) {
+    for (Exception ex : runner.getErrors()) {
       LOG.warn("Sink had error", ex);
     }
     Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());


Mime
View raw message