flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [06/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes
Date Thu, 30 Jun 2016 02:21:32 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
index 977ad6c..c4fe6c6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
@@ -100,20 +100,20 @@ public class ChannelCounter extends MonitoredCounterGroup implements
     return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta);
   }
 
-  public void setChannelCapacity(long capacity){
+  public void setChannelCapacity(long capacity) {
     set(COUNTER_CHANNEL_CAPACITY, capacity);
   }
 
   @Override
-  public long getChannelCapacity(){
+  public long getChannelCapacity() {
     return get(COUNTER_CHANNEL_CAPACITY);
   }
 
   @Override
-  public double getChannelFillPercentage(){
+  public double getChannelFillPercentage() {
     long capacity = getChannelCapacity();
-    if(capacity != 0L) {
-      return ((getChannelSize()/(double)capacity) * 100);
+    if (capacity != 0L) {
+      return (getChannelSize() / (double)capacity) * 100;
     }
     return Double.MAX_VALUE;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
index 7d4be55..bd9cd88 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
@@ -85,12 +85,6 @@ public class GangliaServer implements MonitorService {
   public final String CONF_ISGANGLIA3 = "isGanglia3";
   private static final String GANGLIA_CONTEXT = "flume.";
 
-  /**
-   *
-   * @param hosts List of hosts to send the metrics to. All of them have to be
-   * running the version of ganglia specified by the configuration.
-   * @throws FlumeException
-   */
   public GangliaServer() throws FlumeException {
     collectorRunnable = new GangliaCollector();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
index 44e26e4..633513a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
@@ -59,7 +59,6 @@ public abstract class MonitoredCounterGroup {
   private AtomicLong stopTime;
   private volatile boolean registered = false;
 
-
   protected MonitoredCounterGroup(Type type, String name, String... attrs) {
     this.type = type;
     this.name = name;
@@ -154,15 +153,15 @@ public abstract class MonitoredCounterGroup {
 
     // Print out the startTime for this component
     logger.info("Shutdown Metric for type: " + type + ", "
-      + "name: " + name + ". "
-      + typePrefix + "." + COUNTER_GROUP_START_TIME
-      + " == " + startTime);
+        + "name: " + name + ". "
+        + typePrefix + "." + COUNTER_GROUP_START_TIME
+        + " == " + startTime);
 
     // Print out the stopTime for this component
     logger.info("Shutdown Metric for type: " + type + ", "
-      + "name: " + name + ". "
-      + typePrefix + "." + COUNTER_GROUP_STOP_TIME
-      + " == " + stopTime);
+        + "name: " + name + ". "
+        + typePrefix + "." + COUNTER_GROUP_STOP_TIME
+        + " == " + stopTime);
 
     // Retrieve and sort counter group map keys
     final List<String> mapKeys = new ArrayList<String>(counterMap.keySet());
@@ -176,8 +175,8 @@ public abstract class MonitoredCounterGroup {
       final long counterMapValue = get(counterMapKey);
 
       logger.info("Shutdown Metric for type: " + type + ", "
-        + "name: " + name + ". "
-        + counterMapKey + " == " + counterMapValue);
+          + "name: " + name + ". "
+          + counterMapKey + " == " + counterMapValue);
     }
   }
 
@@ -276,9 +275,9 @@ public abstract class MonitoredCounterGroup {
     INTERCEPTOR,
     SERIALIZER,
     OTHER
-  };
+  }
 
-  public String getType(){
+  public String getType() {
     return type.name();
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
index 443335c..4e1a28c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
@@ -33,7 +33,7 @@ public enum MonitoringType {
     this.monitoringClass = klass;
   }
 
-  public Class<? extends MonitorService> getMonitorClass(){
+  public Class<? extends MonitorService> getMonitorClass() {
     return this.monitoringClass;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
index 54f4a4c..534adc8 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
@@ -53,7 +53,6 @@ public class SinkCounter extends MonitoredCounterGroup implements
     COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
   };
 
-
   public SinkCounter(String name) {
     super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
   }
@@ -63,7 +62,6 @@ public class SinkCounter extends MonitoredCounterGroup implements
         (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES));
   }
 
-
   @Override
   public long getConnectionCreatedCount() {
     return get(COUNTER_CONNECTION_CREATED);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index 02ef6ed..f96694e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -41,15 +41,12 @@ public class SourceCounter extends MonitoredCounterGroup implements
   private static final String COUNTER_OPEN_CONNECTION_COUNT =
           "src.open-connection.count";
 
-
-  private static final String[] ATTRIBUTES =
-    {
-      COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
-      COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
-      COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED,
-      COUNTER_OPEN_CONNECTION_COUNT
-    };
-
+  private static final String[] ATTRIBUTES = {
+    COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
+    COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
+    COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED,
+    COUNTER_OPEN_CONNECTION_COUNT
+  };
 
   public SourceCounter(String name) {
     super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
@@ -126,7 +123,7 @@ public class SourceCounter extends MonitoredCounterGroup implements
     return get(COUNTER_OPEN_CONNECTION_COUNT);
   }
 
-  public void setOpenConnectionCount(long openConnectionCount){
+  public void setOpenConnectionCount(long openConnectionCount) {
     set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
index 7c0afb0..921a1f7 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java
@@ -94,9 +94,7 @@ public class HTTPMetricsServer implements MonitorService {
 
   private class HTTPMetricsHandler extends AbstractHandler {
 
-    Type mapType =
-            new TypeToken<Map<String, Map<String, String>>>() {
-            }.getType();
+    Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType();
     Gson gson = new Gson();
 
     @Override
@@ -108,8 +106,8 @@ public class HTTPMetricsServer implements MonitorService {
       //If we want to use any other url for something else, we should make sure
       //that for metrics only /metrics is used to prevent backward
       //compatibility issues.
-      if(request.getMethod().equalsIgnoreCase("TRACE") || request.getMethod()
-        .equalsIgnoreCase("OPTIONS")) {
+      if (request.getMethod().equalsIgnoreCase("TRACE") ||
+          request.getMethod().equalsIgnoreCase("OPTIONS")) {
         response.sendError(HttpServletResponse.SC_FORBIDDEN);
         response.flushBuffer();
         ((Request) request).setHandled(true);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
index 6e142cf..28d3c8c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
@@ -23,7 +23,6 @@ import org.apache.flume.instrumentation.ChannelCounter;
 public class KafkaChannelCounter extends ChannelCounter
     implements KafkaChannelCounterMBean {
 
-
   private static final String TIMER_KAFKA_EVENT_GET =
       "channel.kafka.event.get.time";
 
@@ -36,13 +35,10 @@ public class KafkaChannelCounter extends ChannelCounter
   private static final String COUNT_ROLLBACK =
       "channel.rollback.count";
 
-
   private static String[] ATTRIBUTES = {
-      TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET,
-      COUNT_ROLLBACK
+    TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET, COUNT_ROLLBACK
   };
 
-
   public KafkaChannelCounter(String name) {
     super(name,ATTRIBUTES);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
index cbd6c35..a779496 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java
@@ -37,8 +37,7 @@ import org.slf4j.LoggerFactory;
 public class JMXPollUtil {
 
   private static Logger LOG = LoggerFactory.getLogger(JMXPollUtil.class);
-  private static MBeanServer mbeanServer = ManagementFactory.
-          getPlatformMBeanServer();
+  private static MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
 
   public static Map<String, Map<String, String>> getAllMBeans() {
     Map<String, Map<String, String>> mbeanMap = Maps.newHashMap();
@@ -54,23 +53,20 @@ public class JMXPollUtil {
         if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
           continue;
         }
-        MBeanAttributeInfo[] attrs = mbeanServer.
-                getMBeanInfo(obj.getObjectName()).getAttributes();
-        String strAtts[] = new String[attrs.length];
+        MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
+        String[] strAtts = new String[attrs.length];
         for (int i = 0; i < strAtts.length; i++) {
           strAtts[i] = attrs[i].getName();
         }
-        AttributeList attrList = mbeanServer.getAttributes(
-                obj.getObjectName(), strAtts);
+        AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts);
         String component = obj.getObjectName().toString().substring(
-                obj.getObjectName().toString().indexOf('=') + 1);
+            obj.getObjectName().toString().indexOf('=') + 1);
         Map<String, String> attrMap = Maps.newHashMap();
 
-
         for (Object attr : attrList) {
           Attribute localAttr = (Attribute) attr;
-          if(localAttr.getName().equalsIgnoreCase("type")){
-            component = localAttr.getValue()+ "." + component;
+          if (localAttr.getName().equalsIgnoreCase("type")) {
+            component = localAttr.getValue() + "." + component;
           }
           attrMap.put(localAttr.getName(), localAttr.getValue().toString());
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
index 2693123..a2ad018 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
@@ -88,7 +88,6 @@ public class HostInterceptor implements Interceptor {
       logger.warn("Could not get local host address. Exception follows.", e);
     }
 
-
   }
 
   @Override
@@ -106,7 +105,7 @@ public class HostInterceptor implements Interceptor {
     if (preserveExisting && headers.containsKey(header)) {
       return event;
     }
-    if(host != null) {
+    if (host != null) {
       headers.put(header, host);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
index 67cfc43..7fda90d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
@@ -61,10 +61,10 @@ import com.google.common.collect.Lists;
  * agent.sources.r1.interceptors.i1.serializers = s1 s2
  * agent.sources.r1.interceptors.i1.serializers.s1.type = com.blah.SomeSerializer
  * agent.sources.r1.interceptors.i1.serializers.s1.name = warning
- * agent.sources.r1.interceptors.i1.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
+ * agent.sources.r1.interceptors.i1.serializers.s2.type =
+ *     org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
  * agent.sources.r1.interceptors.i1.serializers.s2.name = error
  * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd
- * </code>
  * </p>
  * <pre>
  * Example 1:
@@ -167,7 +167,8 @@ public class RegexExtractorInterceptor implements Interceptor {
 
     private Pattern regex;
     private List<NameAndSerializer> serializerList;
-    private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
+    private final RegexExtractorInterceptorSerializer defaultSerializer =
+        new RegexExtractorInterceptorPassThroughSerializer();
 
     @Override
     public void configure(Context context) {
@@ -191,7 +192,7 @@ public class RegexExtractorInterceptor implements Interceptor {
           new Context(context.getSubProperties(SERIALIZERS + "."));
 
       serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
-      for(String serializerName : serializerNames) {
+      for (String serializerName : serializerNames) {
         Context serializerContext = new Context(
             serializerContexts.getSubProperties(serializerName + "."));
         String type = serializerContext.getString("type", "DEFAULT");
@@ -199,7 +200,7 @@ public class RegexExtractorInterceptor implements Interceptor {
         Preconditions.checkArgument(!StringUtils.isEmpty(name),
             "Supplied name cannot be empty.");
 
-        if("DEFAULT".equals(type)) {
+        if ("DEFAULT".equals(type)) {
           serializerList.add(new NameAndSerializer(name, defaultSerializer));
         } else {
           serializerList.add(new NameAndSerializer(name, getCustomSerializer(

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
index 8a3b6ce..d8327d4 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
@@ -103,16 +103,13 @@ public class RegexFilteringInterceptor implements Interceptor {
     if (!excludeEvents) {
       if (regex.matcher(new String(event.getBody())).find()) {
         return event;
-      }
-      else {
+      } else {
         return null;
       }
-    }
-    else {
+    } else {
       if (regex.matcher(new String(event.getBody())).find()) {
         return null;
-      }
-      else {
+      } else {
         return event;
       }
     }
@@ -129,7 +126,9 @@ public class RegexFilteringInterceptor implements Interceptor {
     List<Event> out = Lists.newArrayList();
     for (Event event : events) {
       Event outEvent = intercept(event);
-      if (outEvent != null) { out.add(outEvent); }
+      if (outEvent != null) {
+        out.add(outEvent);
+      }
     }
     return out;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
index b8588cd..c4ec43b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java
@@ -56,8 +56,8 @@ public class SearchAndReplaceInterceptor implements Interceptor {
   private final Charset charset;
 
   private SearchAndReplaceInterceptor(Pattern searchPattern,
-    String replaceString,
-    Charset charset) {
+                                      String replaceString,
+                                      Charset charset) {
     this.searchPattern = searchPattern;
     this.replaceString = replaceString;
     this.charset = charset;
@@ -107,7 +107,7 @@ public class SearchAndReplaceInterceptor implements Interceptor {
       replaceString = context.getString(REPLACE_STRING_KEY);
       // Empty replacement String value or if the property itself is not present
       // assign empty string as replacement
-      if(replaceString == null) {
+      if (replaceString == null) {
         replaceString = "";
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
index 97df467..d2eb523 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
@@ -26,8 +26,6 @@ import org.apache.flume.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flume.interceptor.StaticInterceptor.Constants.*;
-
 /**
  * Interceptor class that appends a static, pre-configured header to all events.
  *
@@ -57,8 +55,7 @@ import static org.apache.flume.interceptor.StaticInterceptor.Constants.*;
  */
 public class StaticInterceptor implements Interceptor {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(StaticInterceptor.class);
+  private static final Logger logger = LoggerFactory.getLogger(StaticInterceptor.class);
 
   private final boolean preserveExisting;
   private final String key;
@@ -123,9 +120,9 @@ public class StaticInterceptor implements Interceptor {
 
     @Override
     public void configure(Context context) {
-      preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DEFAULT);
-      key = context.getString(KEY, KEY_DEFAULT);
-      value = context.getString(VALUE, VALUE_DEFAULT);
+      preserveExisting = context.getBoolean(Constants.PRESERVE, Constants.PRESERVE_DEFAULT);
+      key = context.getString(Constants.KEY, Constants.KEY_DEFAULT);
+      value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
     }
 
     @Override
@@ -136,11 +133,9 @@ public class StaticInterceptor implements Interceptor {
       return new StaticInterceptor(preserveExisting, key, value);
     }
 
-
   }
 
   public static class Constants {
-
     public static final String KEY = "key";
     public static final String KEY_DEFAULT = "key";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
index 9d942f6..50c3695 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+
 import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
index d7fe7ac..f42fd86 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
@@ -44,44 +44,42 @@ import org.apache.flume.annotations.InterfaceStability;
  * <p>
  * Example usage
  * </p>
- * <code>
- *  public class MyService implements LifecycleAware {
+ * <pre>
+ * {@code
+ * public class MyService implements LifecycleAware {
  *
- *    private LifecycleState lifecycleState;
+ *   private LifecycleState lifecycleState;
  *
- *    public MyService() {
- *      lifecycleState = LifecycleState.IDLE;
- *    }
+ *   public MyService() {
+ *     lifecycleState = LifecycleState.IDLE;
+ *   }
  *
- *    @Override
- *    public void start(Context context) throws LifecycleException,
- *      InterruptedException {
+ *   @Override
+ *   public void start(Context context) throws LifecycleException, InterruptedException {
+ *     // ...your code does something.
+ *     lifecycleState = LifecycleState.START;
+ *   }
  *
- *      ...your code does something.
+ *   @Override
+ *   public void stop(Context context) throws LifecycleException, InterruptedException {
  *
- *      lifecycleState = LifecycleState.START;
- *    }
+ *     try {
+ *       // ...you stop services here.
+ *     } catch (SomethingException) {
+ *       lifecycleState = LifecycleState.ERROR;
+ *     }
  *
- *    @Override
- *    public void stop(Context context) throws LifecycleException,
- *      InterruptedException {
+ *     lifecycleState = LifecycleState.STOP;
+ *   }
  *
- *      try {
- *        ...you stop services here.
- *      } catch (SomethingException) {
- *        lifecycleState = LifecycleState.ERROR;
- *      }
+ *   @Override
+ *   public LifecycleState getLifecycleState() {
+ *     return lifecycleState;
+ *   }
  *
- *      lifecycleState = LifecycleState.STOP;
- *    }
- *
- *    @Override
- *    public LifecycleState getLifecycleState() {
- *      return lifecycleState;
- *    }
- *
- *  }
- * </code>
+ * }
+ * }
+ * </pre>
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
index 59d780a..773d671 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
@@ -36,8 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LifecycleSupervisor implements LifecycleAware {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(LifecycleSupervisor.class);
+  private static final Logger logger = LoggerFactory.getLogger(LifecycleSupervisor.class);
 
   private Map<LifecycleAware, Supervisoree> supervisedProcesses;
   private Map<LifecycleAware, ScheduledFuture<?>> monitorFutures;
@@ -81,15 +80,15 @@ public class LifecycleSupervisor implements LifecycleAware {
 
     if (monitorService != null) {
       monitorService.shutdown();
-      try{
+      try {
         monitorService.awaitTermination(10, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         logger.error("Interrupted while waiting for monitor service to stop");
       }
-      if(!monitorService.isTerminated()) {
+      if (!monitorService.isTerminated()) {
         monitorService.shutdownNow();
         try {
-          while(!monitorService.isTerminated()) {
+          while (!monitorService.isTerminated()) {
             monitorService.awaitTermination(10, TimeUnit.SECONDS);
           }
         } catch (InterruptedException e) {
@@ -98,8 +97,7 @@ public class LifecycleSupervisor implements LifecycleAware {
       }
     }
 
-    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
-        .entrySet()) {
+    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
 
       if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
         entry.getValue().status.desiredState = LifecycleState.STOP;
@@ -122,9 +120,9 @@ public class LifecycleSupervisor implements LifecycleAware {
 
   public synchronized void supervise(LifecycleAware lifecycleAware,
       SupervisorPolicy policy, LifecycleState desiredState) {
-    if(this.monitorService.isShutdown()
+    if (this.monitorService.isShutdown()
         || this.monitorService.isTerminated()
-        || this.monitorService.isTerminating()){
+        || this.monitorService.isTerminating()) {
       throw new FlumeException("Supervise called on " + lifecycleAware + " " +
           "after shutdown has been initiated. " + lifecycleAware + " will not" +
           " be started");
@@ -165,8 +163,8 @@ public class LifecycleSupervisor implements LifecycleAware {
     logger.debug("Unsupervising service:{}", lifecycleAware);
 
     synchronized (lifecycleAware) {
-    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
-    supervisoree.status.discard = true;
+      Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
+      supervisoree.status.discard = true;
       this.setDesiredState(lifecycleAware, LifecycleState.STOP);
       logger.info("Stopping component: {}", lifecycleAware);
       lifecycleAware.stop();
@@ -199,7 +197,7 @@ public class LifecycleSupervisor implements LifecycleAware {
     return lifecycleState;
   }
 
-  public synchronized boolean isComponentInErrorState(LifecycleAware component){
+  public synchronized boolean isComponentInErrorState(LifecycleAware component) {
     return supervisedProcesses.get(component).status.error;
 
   }
@@ -301,18 +299,18 @@ public class LifecycleSupervisor implements LifecycleAware {
             }
           }
         }
-      } catch(Throwable t) {
+      } catch (Throwable t) {
         logger.error("Unexpected error", t);
       }
       logger.debug("Status check complete");
     }
   }
 
-  private class Purger implements Runnable{
+  private class Purger implements Runnable {
 
     @Override
     public void run() {
-      if(needToPurge){
+      if (needToPurge) {
         monitorService.purge();
         needToPurge = false;
       }
@@ -338,7 +336,7 @@ public class LifecycleSupervisor implements LifecycleAware {
 
   }
 
-  public static abstract class SupervisorPolicy {
+  public abstract static class SupervisorPolicy {
 
     abstract boolean isValid(LifecycleAware object, Status status);
 
@@ -372,5 +370,4 @@ public class LifecycleSupervisor implements LifecycleAware {
 
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
index 5faf449..c648958 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java
@@ -52,8 +52,7 @@ import java.util.Locale;
  */
 public class AvroEventDeserializer implements EventDeserializer {
 
-  private static final Logger logger = LoggerFactory.getLogger
-      (AvroEventDeserializer.class);
+  private static final Logger logger = LoggerFactory.getLogger(AvroEventDeserializer.class);
 
   private final AvroSchemaType schemaType;
   private final ResettableInputStream ris;
@@ -180,8 +179,8 @@ public class AvroEventDeserializer implements EventDeserializer {
   }
 
   private static class SeekableResettableInputBridge implements SeekableInput {
-
     ResettableInputStream ris;
+
     public SeekableResettableInputBridge(ResettableInputStream ris) {
       this.ris = ris;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
index d09f02d..4d7a525 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
@@ -31,8 +31,7 @@ import org.slf4j.LoggerFactory;
  */
 public class BodyTextEventSerializer implements EventSerializer {
 
-  private final static Logger logger =
-      LoggerFactory.getLogger(BodyTextEventSerializer.class);
+  private static final Logger logger = LoggerFactory.getLogger(BodyTextEventSerializer.class);
 
   // for legacy reasons, by default, append a newline to each event written out
   private final String APPEND_NEWLINE = "appendNewline";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
index 9c6003c..8e3621d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
  */
 public class HeaderAndBodyTextEventSerializer implements EventSerializer {
 
-  private final static Logger logger =
+  private static final Logger logger =
       LoggerFactory.getLogger(HeaderAndBodyTextEventSerializer.class);
 
   // for legacy reasons, by default, append a newline to each event written out

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java
index 7c87235..8f23685 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java
@@ -38,8 +38,7 @@ import java.util.List;
 @InterfaceStability.Evolving
 public class LineDeserializer implements EventDeserializer {
 
-  private static final Logger logger = LoggerFactory.getLogger
-      (LineDeserializer.class);
+  private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class);
 
   private final ResettableInputStream in;
   private final Charset outputCharset;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
index 618913e..7d6d95c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
@@ -64,12 +64,13 @@ import java.nio.charset.CodingErrorAction;
  * <p>Thus the behaviour of mark and reset is as follows:</p>
  *
  * <ol>
- *   <li>If {@link #mark()} is called after a high surrogate pair has been returned by {@link #readChar()},
- *   the marked position will be that of the character <em>following</em> the low surrogate,
- *   <em>not</em> that of the low surrogate itself.</li>
- *   <li>If {@link #reset()} is called after a high surrogate pair has been returned by {@link #readChar()},
- *   the low surrogate is always returned by the next call to {@link #readChar()},
- *   <em>before</em> the stream is actually reset to the last marked position.</li>
+ *   <li>If {@link #mark()} is called after a high surrogate pair has been returned by
+ *   {@link #readChar()}, the marked position will be that of the character <em>following</em>
+ *   the low surrogate, <em>not</em> that of the low surrogate itself.</li>
+ *   <li>If {@link #reset()} is called after a high surrogate pair has been returned by
+ *   {@link #readChar()}, the low surrogate is always returned by the next call to
+ *   {@link #readChar()}, <em>before</em> the stream is actually reset to the last marked
+ *   position.</li>
  * </ol>
  *
  * <p>This ensures that no dangling high surrogate could ever be read as long as
@@ -181,13 +182,13 @@ public class ResettableFileInputStream extends ResettableInputStream
     this.decoder = charset.newDecoder();
     this.position = 0;
     this.syncPosition = 0;
-    if(charset.name().startsWith("UTF-8")) {
+    if (charset.name().startsWith("UTF-8")) {
       // some JDKs wrongly report 3 bytes max
       this.maxCharWidth = 4;
-    } else if(charset.name().startsWith("UTF-16")) {
+    } else if (charset.name().startsWith("UTF-16")) {
       // UTF_16BE and UTF_16LE wrongly report 2 bytes max
       this.maxCharWidth = 4;
-    } else if(charset.name().startsWith("UTF-32")) {
+    } else if (charset.name().startsWith("UTF-32")) {
       // UTF_32BE and UTF_32LE wrongly report 4 bytes max
       this.maxCharWidth = 8;
     } else {
@@ -254,7 +255,7 @@ public class ResettableFileInputStream extends ResettableInputStream
 
     // Check whether we are in the middle of a surrogate pair,
     // in which case, return the last (low surrogate) char of the pair.
-    if(hasLowSurrogate) {
+    if (hasLowSurrogate) {
       hasLowSurrogate = false;
       return lowSurrogate;
     }
@@ -296,7 +297,7 @@ public class ResettableFileInputStream extends ResettableInputStream
     // Found nothing, but the byte buffer has not been entirely consumed.
     // This situation denotes the presence of a surrogate pair
     // that can only be decoded if we have a 2-char buffer.
-    if(buf.hasRemaining()) {
+    if (buf.hasRemaining()) {
       charBuf.clear();
       // increase the limit to 2
       charBuf.limit(2);
@@ -312,9 +313,10 @@ public class ResettableFileInputStream extends ResettableInputStream
         // save second (low surrogate) char for later consumption
         lowSurrogate = charBuf.get();
         // Check if we really have a surrogate pair
-        if( ! Character.isHighSurrogate(highSurrogate) || ! Character.isLowSurrogate(lowSurrogate)) {
+        if (!Character.isHighSurrogate(highSurrogate) || !Character.isLowSurrogate(lowSurrogate)) {
           // This should only happen in case of bad sequences (dangling surrogate, etc.)
-          logger.warn("Decoded a pair of chars, but it does not seem to be a surrogate pair: {} {}", (int)highSurrogate, (int)lowSurrogate);
+          logger.warn("Decoded a pair of chars, but it does not seem to be a surrogate pair: {} {}",
+                      (int)highSurrogate, (int)lowSurrogate);
         }
         hasLowSurrogate = true;
         // consider the pair as a single unit and increment position normally

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index 5146834..f6024aa 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -115,14 +115,16 @@ import java.util.concurrent.locks.ReentrantLock;
  * </tr>
  * <tr>
  * <td><tt>compression-type</tt></td>
- * <td>Select compression type.  Default is "none" and the only compression type available is "deflate"</td>
+ * <td>Select compression type. Default is "none" and the only compression type available
+ *     is "deflate"</td>
  * <td>compression type</td>
  * <td>none</td>
  * </tr>
  * <tr>
  * <td><tt>compression-level</tt></td>
- * <td>In the case compression type is "deflate" this value can be between 0-9.  0 being no compression and
- * 1-9 is compression.  The higher the number the better the compression.  6 is the default.</td>
+ * <td>In the case compression type is "deflate" this value can be between 0-9.
+ *     0 being no compression and 1-9 is compression.
+ *     The higher the number the better the compression. 6 is the default.</td>
  * <td>compression level</td>
  * <td>6</td>
  * </tr>
@@ -139,11 +141,9 @@ import java.util.concurrent.locks.ReentrantLock;
  * This method will be called whenever this sink needs to create a new
  * connection to the source.
  */
-public abstract class AbstractRpcSink extends AbstractSink
-  implements Configurable {
+public abstract class AbstractRpcSink extends AbstractSink implements Configurable {
 
-  private static final Logger logger = LoggerFactory.getLogger
-    (AbstractRpcSink.class);
+  private static final Logger logger = LoggerFactory.getLogger(AbstractRpcSink.class);
   private String hostname;
   private Integer port;
   private RpcClient client;
@@ -152,9 +152,9 @@ public abstract class AbstractRpcSink extends AbstractSink
   private int cxnResetInterval;
   private AtomicBoolean resetConnectionFlag;
   private final int DEFAULT_CXN_RESET_INTERVAL = 0;
-  private final ScheduledExecutorService cxnResetExecutor = Executors
-    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
-      .setNameFormat("Rpc Sink Reset Thread").build());
+  private final ScheduledExecutorService cxnResetExecutor =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder().setNameFormat("Rpc Sink Reset Thread").build());
 
   @Override
   public void configure(Context context) {
@@ -179,10 +179,9 @@ public abstract class AbstractRpcSink extends AbstractSink
     }
     cxnResetInterval = context.getInteger("reset-connection-interval",
       DEFAULT_CXN_RESET_INTERVAL);
-    if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {
-      logger.info("Connection reset is set to " + String.valueOf
-        (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " +
-        "hop");
+    if (cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {
+      logger.info("Connection reset is set to " + String.valueOf(DEFAULT_CXN_RESET_INTERVAL) +
+                  ". Will not reset connection to next hop");
     }
   }
 
@@ -210,7 +209,7 @@ public abstract class AbstractRpcSink extends AbstractSink
         resetConnectionFlag = new AtomicBoolean(false);
         client = initializeRpcClient(clientProps);
         Preconditions.checkNotNull(client, "Rpc Client could not be " +
-          "initialized. " + getName() + " could not be started");
+            "initialized. " + getName() + " could not be started");
         sinkCounter.incrementConnectionCreatedCount();
         if (cxnResetInterval > 0) {
           cxnResetExecutor.schedule(new Runnable() {
@@ -228,20 +227,19 @@ public abstract class AbstractRpcSink extends AbstractSink
           throw new FlumeException(ex);
         }
       }
-       logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);
+      logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);
     }
 
   }
 
   private void resetConnection() {
-      try {
-        destroyConnection();
-        createConnection();
-      } catch (Throwable throwable) {
-        //Don't rethrow, else this runnable won't get scheduled again.
-        logger.error("Error while trying to expire connection",
-          throwable);
-      }
+    try {
+      destroyConnection();
+      createConnection();
+    } catch (Throwable throwable) {
+      // Don't rethrow, else this runnable won't get scheduled again.
+      logger.error("Error while trying to expire connection", throwable);
+    }
   }
 
   private void destroyConnection() {
@@ -314,8 +312,7 @@ public abstract class AbstractRpcSink extends AbstractSink
         cxnResetExecutor.shutdownNow();
       }
     } catch (Exception ex) {
-      logger.error("Interrupted while waiting for connection reset executor " +
-        "to shut down");
+      logger.error("Interrupted while waiting for connection reset executor to shut down");
     }
     sinkCounter.stop();
     super.stop();
@@ -335,7 +332,7 @@ public abstract class AbstractRpcSink extends AbstractSink
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
 
-    if(resetConnectionFlag.get()) {
+    if (resetConnectionFlag.get()) {
       resetConnection();
       // if the time to reset is long and the timeout is short
       // this may cancel the next reset request

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
index 1112643..585044b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
@@ -30,7 +30,7 @@ import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-abstract public class AbstractSink implements Sink, LifecycleAware {
+public abstract class AbstractSink implements Sink, LifecycleAware {
 
   private Channel channel;
   private String name;
@@ -78,7 +78,8 @@ abstract public class AbstractSink implements Sink, LifecycleAware {
     return name;
   }
 
+  @Override
   public String toString() {
-	  return this.getClass().getName() + "{name:" + name + ", channel:" + channel.getName() + "}";
+    return this.getClass().getName() + "{name:" + name + ", channel:" + channel.getName() + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
index 1c30592..3de653a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
@@ -37,7 +37,7 @@ public abstract class AbstractSinkProcessor implements SinkProcessor {
 
   @Override
   public void start() {
-    for(Sink s : sinkList) {
+    for (Sink s : sinkList) {
       s.start();
     }
 
@@ -46,7 +46,7 @@ public abstract class AbstractSinkProcessor implements SinkProcessor {
 
   @Override
   public void stop() {
-    for(Sink s : sinkList) {
+    for (Sink s : sinkList) {
       s.stop();
     }
     state = LifecycleState.STOP;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
index 9ddeef4..f236a8a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
@@ -38,7 +38,7 @@ public abstract class AbstractSinkSelector implements SinkSelector {
   @Override
   public void configure(Context context) {
     Long timeOut = context.getLong("maxTimeOut");
-    if(timeOut != null){
+    if (timeOut != null) {
       maxTimeOut = timeOut;
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
index 6a5be92..6f405fb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
@@ -53,8 +53,7 @@ public class DefaultSinkFactory implements SinkFactory {
 
   @SuppressWarnings("unchecked")
   @Override
-  public Class<? extends Sink> getClass(String type)
-  throws FlumeException {
+  public Class<? extends Sink> getClass(String type) throws FlumeException {
     String sinkClassName = type;
     SinkType sinkType = SinkType.OTHER;
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
index 00a362b..2da9264 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
@@ -35,8 +35,7 @@ import com.google.common.base.Preconditions;
  * results without any additional handling. Suitable for all sinks that aren't
  * assigned to a group.
  */
-public class DefaultSinkProcessor implements SinkProcessor,
-ConfigurableComponent {
+public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
   private Sink sink;
   private LifecycleState lifecycleState;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
index 3bd52f2..69541e6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
@@ -70,12 +70,14 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
     private Integer priority;
     private Sink sink;
     private Integer sequentialFailures;
+
     public FailedSink(Integer priority, Sink sink, int seqFailures) {
       this.sink = sink;
       this.priority = priority;
       this.sequentialFailures = seqFailures;
       adjustRefresh();
     }
+
     @Override
     public int compareTo(FailedSink arg0) {
       return refresh.compareTo(arg0.refresh);
@@ -88,24 +90,25 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
     public Sink getSink() {
       return sink;
     }
+
     public Integer getPriority() {
       return priority;
     }
+
     public void incFails() {
       sequentialFailures++;
       adjustRefresh();
-      logger.debug("Sink {} failed again, new refresh is at {}, " +
-            "current time {}", new Object[] {
-              sink.getName(), refresh, System.currentTimeMillis()});
+      logger.debug("Sink {} failed again, new refresh is at {}, current time {}",
+                   new Object[] { sink.getName(), refresh, System.currentTimeMillis() });
     }
+
     private void adjustRefresh() {
       refresh = System.currentTimeMillis()
-              + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
+          + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
     }
   }
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(FailoverSinkProcessor.class);
+  private static final Logger logger = LoggerFactory.getLogger(FailoverSinkProcessor.class);
 
   private static final String PRIORITY_PREFIX = "priority.";
   private static final String MAX_PENALTY_PREFIX = "maxpenalty";
@@ -121,15 +124,15 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
     failedSinks = new PriorityQueue<FailedSink>();
     Integer nextPrio = 0;
     String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
-    if(maxPenaltyStr == null) {
+    if (maxPenaltyStr == null) {
       maxPenalty = DEFAULT_MAX_PENALTY;
     } else {
       try {
         maxPenalty = Integer.parseInt(maxPenaltyStr);
       } catch (NumberFormatException e) {
         logger.warn("{} is not a valid value for {}",
-                new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
-        maxPenalty  = DEFAULT_MAX_PENALTY;
+                    new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
+        maxPenalty = DEFAULT_MAX_PENALTY;
       }
     }
     for (Entry<String, Sink> entry : sinks.entrySet()) {
@@ -140,7 +143,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
       } catch (Exception e) {
         priority = --nextPrio;
       }
-      if(!liveSinks.containsKey(priority)) {
+      if (!liveSinks.containsKey(priority)) {
         liveSinks.put(priority, sinks.get(entry.getKey()));
       } else {
         logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
@@ -155,7 +158,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
   public Status process() throws EventDeliveryException {
     // Retry any failed sinks that have gone through their "cooldown" period
     Long now = System.currentTimeMillis();
-    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
+    while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
       FailedSink cur = failedSinks.poll();
       Status s;
       try {
@@ -177,7 +180,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
     }
 
     Status ret = null;
-    while(activeSink != null) {
+    while (activeSink != null) {
       try {
         ret = activeSink.process();
         return ret;
@@ -196,8 +199,8 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor {
     Integer key = liveSinks.lastKey();
     failedSinks.add(new FailedSink(key, activeSink, 1));
     liveSinks.remove(key);
-    if(liveSinks.isEmpty()) return null;
-    if(liveSinks.lastKey() != null) {
+    if (liveSinks.isEmpty()) return null;
+    if (liveSinks.lastKey() != null) {
       return liveSinks.get(liveSinks.lastKey());
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
index 2d85756..ac0781e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
@@ -56,12 +56,14 @@ import org.apache.flume.util.RoundRobinOrderSelector;
  *
  * <p>
  * Sample configuration:
- *  <pre>
- *  host1.sinkgroups.group1.sinks = sink1 sink2
- *  host1.sinkgroups.group1.processor.type = load_balance
- *  host1.sinkgroups.group1.processor.selector = <selector type>
- *  host1.sinkgroups.group1.processor.selector.selector_property = <value>
- *  </pre>
+ * <pre>
+ * {@code
+ * host1.sinkgroups.group1.sinks = sink1 sink2
+ * host1.sinkgroups.group1.processor.type = load_balance
+ * host1.sinkgroups.group1.processor.selector = <selector type>
+ * host1.sinkgroups.group1.processor.selector.selector_property = <value>
+ * }
+ * </pre>
  *
  * The value of processor.selector could be either <tt>round_robin</tt> for
  * round-robin scheme of load-balancing or <tt>random</tt> for random
@@ -200,26 +202,28 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
   }
 
   /**
-   * A sink selector that implements the round-robin sink selection policy.
-   * This implementation is not MT safe.
+   * <p>A sink selector that implements the round-robin sink selection policy.
+   * This implementation is not MT safe.</p>
+   *
+   * <p>Unfortunately both implementations need to override the base implementation
+   * in AbstractSinkSelector class, because any custom sink selectors
+   * will break if this stuff is moved to that class.</p>
    */
-  //Unfortunately both implementations need to override the base implementation
-  //in AbstractSinkSelector class, because any custom sink selectors
-  //will break if this stuff is moved to that class.
   private static class RoundRobinSinkSelector extends AbstractSinkSelector {
-
     private OrderSelector<Sink> selector;
-    RoundRobinSinkSelector(boolean backoff){
+
+    RoundRobinSinkSelector(boolean backoff) {
       selector = new RoundRobinOrderSelector<Sink>(backoff);
     }
 
     @Override
-    public void configure(Context context){
+    public void configure(Context context) {
       super.configure(context);
       if (maxTimeOut != 0) {
         selector.setMaxTimeOut(maxTimeOut);
       }
     }
+
     @Override
     public Iterator<Sink> createSinkIterator() {
       return selector.createIterator();
@@ -245,7 +249,7 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
 
     private OrderSelector<Sink> selector;
 
-    RandomOrderSinkSelector(boolean backoff){
+    RandomOrderSinkSelector(boolean backoff) {
       selector = new RandomOrderSelector<Sink>(backoff);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
index 9cf9bc2..d219be7 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
@@ -71,8 +71,9 @@ public class LoggerSink extends AbstractSink implements Configurable {
       try {
         maxBytesToLog = Integer.parseInt(strMaxBytes);
       } catch (NumberFormatException e) {
-        logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump",
-                strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
+        logger.warn(String.format(
+            "Unable to convert %s to integer, using default value(%d) for maxByteToDump",
+            strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
         maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
index cada6ec..eb00e15 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
@@ -92,7 +92,7 @@ public class NullSink extends AbstractSink implements Configurable {
         if (++eventCounter % logEveryNEvents == 0) {
           logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);
         }
-        if(event == null) {
+        if (event == null) {
           status = Status.BACKOFF;
           break;
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index b97d404..ee4b947 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -112,7 +112,7 @@ public class RollingFileSink extends AbstractSink implements Configurable {
     super.start();
 
     pathController.setBaseDirectory(directory);
-    if(rollInterval > 0){
+    if (rollInterval > 0) {
 
       rollService = Executors.newScheduledThreadPool(
           1,
@@ -136,7 +136,7 @@ public class RollingFileSink extends AbstractSink implements Configurable {
         }
 
       }, rollInterval, rollInterval, TimeUnit.SECONDS);
-    } else{
+    } else {
       logger.info("RollInterval is not valid, file rolling will not happen.");
     }
     logger.info("RollingFileSink {} started.", getName());
@@ -251,17 +251,15 @@ public class RollingFileSink extends AbstractSink implements Configurable {
         serializer = null;
       }
     }
-    if(rollInterval > 0){
+    if (rollInterval > 0) {
       rollService.shutdown();
 
       while (!rollService.isTerminated()) {
         try {
           rollService.awaitTermination(1, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-          logger
-          .debug(
-              "Interrupted while waiting for roll service to stop. " +
-              "Please report this.", e);
+          logger.debug("Interrupted while waiting for roll service to stop. " +
+                       "Please report this.", e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java b/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
index dcdcad2..084072f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
@@ -55,8 +55,7 @@ public class SinkProcessorFactory {
    * processor configuration
    */
   @SuppressWarnings("unchecked")
-  public static SinkProcessor getProcessor(Context context,
- List<Sink> sinks) {
+  public static SinkProcessor getProcessor(Context context, List<Sink> sinks) {
     Preconditions.checkNotNull(context);
     Preconditions.checkNotNull(sinks);
     Preconditions.checkArgument(!sinks.isEmpty());
@@ -71,7 +70,7 @@ public class SinkProcessorFactory {
       logger.warn("Sink Processor type {} is a custom type", typeStr);
     }
 
-    if(!type.equals(SinkProcessorType.OTHER)) {
+    if (!type.equals(SinkProcessorType.OTHER)) {
       processorClassName = type.getSinkProcessorClassName();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
index 32021d3..bcab731 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -106,15 +106,15 @@ public class ThriftSink extends AbstractRpcSink {
   protected RpcClient initializeRpcClient(Properties props) {
     // Only one thread is enough, since only one sink thread processes
     // transactions at any given time. Each sink owns its own Rpc client.
-    props.setProperty(RpcClientConfigurationConstants
-      .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1));
-    boolean enableKerberos =  Boolean.parseBoolean(props.getProperty(
-      RpcClientConfigurationConstants.KERBEROS_KEY, "false"));
-    if(enableKerberos) {
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
+                      String.valueOf(1));
+    boolean enableKerberos = Boolean.parseBoolean(
+        props.getProperty(RpcClientConfigurationConstants.KERBEROS_KEY, "false"));
+    if (enableKerberos) {
       return SecureRpcClientFactory.getThriftInstance(props);
     } else {
       props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
-              RpcClientFactory.ClientType.THRIFT.name());
+                        RpcClientFactory.ClientType.THRIFT.name());
       return RpcClientFactory.getInstance(props);
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
index 89bd357..08f9b84 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
@@ -29,9 +29,9 @@ import org.apache.flume.annotations.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class AbstractEventDrivenSource extends BasicSourceSemantics
-  implements EventDrivenSource {
-
+public abstract class AbstractEventDrivenSource
+    extends BasicSourceSemantics
+    implements EventDrivenSource {
   public AbstractEventDrivenSource() {
     super();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
index 33e1acc..97f6c99 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
@@ -37,8 +37,8 @@ import org.apache.flume.annotations.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class AbstractPollableSource extends BasicSourceSemantics
-  implements PollableSource {
+public abstract class AbstractPollableSource
+    extends BasicSourceSemantics implements PollableSource {
 
   long backoffSleepIncrement = PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT;
   long maxBackoffSleep = PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP;
@@ -46,14 +46,16 @@ public abstract class AbstractPollableSource extends BasicSourceSemantics
   public AbstractPollableSource() {
     super();
   }
+
   public Status process() throws EventDeliveryException {
     Exception exception = getStartException();
     if (exception != null) {
       throw new FlumeException("Source had error configuring or starting",
           exception);
     }
-    if(!isStarted()) {
-      throw new EventDeliveryException("Source is not started.  It is in '" + getLifecycleState() + "' state");
+    if (!isStarted()) {
+      throw new EventDeliveryException("Source is not started.  It is in '" +
+                                       getLifecycleState() + "' state");
     }
     return doProcess();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
index 0855de3..88ef665 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
@@ -29,7 +29,7 @@ import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-abstract public class AbstractSource implements Source {
+public abstract class AbstractSource implements Source {
 
   private ChannelProcessor channelProcessor;
   private String name;
@@ -79,6 +79,6 @@ abstract public class AbstractSource implements Source {
   }
 
   public String toString() {
-	  return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState +"}";
+    return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState + "}";
   }  
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 6eb6a0a..8b9b956 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -21,18 +21,6 @@ package org.apache.flume.source;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import java.io.FileInputStream;
-import java.net.InetSocketAddress;
-import java.security.KeyStore;
-import java.security.Security;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.NettyTransceiver;
@@ -53,10 +41,10 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.jboss.netty.handler.ipfilter.IpFilterRule;
@@ -66,6 +54,23 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * <p>
  * A {@link Source} implementation that receives Avro events from clients that
@@ -205,7 +210,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     if (enableIpFilter) {
       patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
       if (patternRuleConfigDefinition == null ||
-        patternRuleConfigDefinition.trim().isEmpty()) {
+          patternRuleConfigDefinition.trim().isEmpty()) {
         throw new FlumeException(
           "ipFilter is configured with true but ipFilterRules is not defined:" +
             " ");
@@ -241,7 +246,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     sourceCounter.start();
     super.start();
     final NettyServer srv = (NettyServer)server;
-    connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
+    connectionCountUpdater.scheduleWithFixedDelay(new Runnable() {
 
       @Override
       public void run() {
@@ -256,22 +261,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private NioServerSocketChannelFactory initSocketChannelFactory() {
     NioServerSocketChannelFactory socketChannelFactory;
     if (maxThreads <= 0) {
-      socketChannelFactory = new NioServerSocketChannelFactory
-        (Executors.newCachedThreadPool(new ThreadFactoryBuilder().
-          setNameFormat("Avro " + NettyTransceiver.class.getSimpleName()
-            + " Boss-%d").build()),
-          Executors.newCachedThreadPool(new ThreadFactoryBuilder().
-            setNameFormat("Avro " + NettyTransceiver.class.getSimpleName()
-              + "  I/O Worker-%d").build()));
+      socketChannelFactory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+              "Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()),
+          Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+              "Avro " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build()));
     } else {
       socketChannelFactory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(new ThreadFactoryBuilder().
-          setNameFormat(
-            "Avro " + NettyTransceiver.class.getSimpleName()
-              + " Boss-%d").build()),
-        Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().
-          setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() +
-            "  I/O Worker-%d").build()));
+        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
+            "Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()),
+        Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat(
+            "Avro " + NettyTransceiver.class.getSimpleName() + "  I/O Worker-%d").build()));
     }
     return socketChannelFactory;
   }
@@ -309,7 +309,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     }
     sourceCounter.stop();
     connectionCountUpdater.shutdown();
-    while(!connectionCountUpdater.isTerminated()){
+    while (!connectionCountUpdater.isTerminated()) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException ex) {
@@ -399,44 +399,40 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     return Status.OK;
   }
 
-  private PatternRule generateRule(
-    String patternRuleDefinition) throws FlumeException {
+  private PatternRule generateRule(String patternRuleDefinition) throws FlumeException {
     patternRuleDefinition = patternRuleDefinition.trim();
     //first validate the format
     int firstColonIndex = patternRuleDefinition.indexOf(":");
     if (firstColonIndex == -1) {
       throw new FlumeException(
-        "Invalid ipFilter patternRule '" + patternRuleDefinition +
+          "Invalid ipFilter patternRule '" + patternRuleDefinition +
           "' should look like <'allow'  or 'deny'>:<'ip' or " +
           "'name'>:<pattern>");
     } else {
-      String ruleAccessFlag = patternRuleDefinition.substring(0,
-        firstColonIndex);
-      int secondColonIndex = patternRuleDefinition.indexOf(":",
-        firstColonIndex + 1);
-      if ((!ruleAccessFlag.equals("allow") &&
-        !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {
+      String ruleAccessFlag = patternRuleDefinition.substring(0, firstColonIndex);
+      int secondColonIndex = patternRuleDefinition.indexOf(":", firstColonIndex + 1);
+      if ((!ruleAccessFlag.equals("allow") && !ruleAccessFlag.equals("deny")) ||
+          secondColonIndex == -1) {
         throw new FlumeException(
-          "Invalid ipFilter patternRule '" + patternRuleDefinition +
+            "Invalid ipFilter patternRule '" + patternRuleDefinition +
             "' should look like <'allow'  or 'deny'>:<'ip' or " +
             "'name'>:<pattern>");
       }
 
       String patternTypeFlag = patternRuleDefinition.substring(
-        firstColonIndex + 1, secondColonIndex);
-      if ((!patternTypeFlag.equals("ip") &&
-        !patternTypeFlag.equals("name"))) {
+          firstColonIndex + 1, secondColonIndex);
+      if ((!patternTypeFlag.equals("ip") && !patternTypeFlag.equals("name"))) {
         throw new FlumeException(
-          "Invalid ipFilter patternRule '" + patternRuleDefinition +
+            "Invalid ipFilter patternRule '" + patternRuleDefinition +
             "' should look like <'allow'  or 'deny'>:<'ip' or " +
             "'name'>:<pattern>");
       }
 
       boolean isAllow = ruleAccessFlag.equals("allow");
       String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n")
-        + ":" + patternRuleDefinition.substring(secondColonIndex + 1);
+          + ":" + patternRuleDefinition.substring(secondColonIndex + 1);
       logger.info("Adding ipFilter PatternRule: "
-        + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
+          + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
       return new PatternRule(isAllow, patternRuleString);
     }
   }
@@ -458,9 +454,9 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     private String patternRuleConfigDefinition;
 
     public AdvancedChannelPipelineFactory(boolean enableCompression,
-      boolean enableSsl, String keystore, String keystorePassword,
-      String keystoreType, boolean enableIpFilter,
-      String patternRuleConfigDefinition) {
+        boolean enableSsl, String keystore, String keystorePassword,
+        String keystoreType, boolean enableIpFilter,
+        String patternRuleConfigDefinition) {
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
       this.keystore = keystore;
@@ -505,7 +501,6 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
         pipeline.addFirst("inflater", new ZlibDecoder());
       }
 
-
       if (enableSsl) {
         SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
         sslEngine.setUseClientMode(false);
@@ -527,11 +522,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
       if (enableIpFilter) {
 
         logger.info("Setting up ipFilter with the following rule definition: " +
-          patternRuleConfigDefinition);
+                    patternRuleConfigDefinition);
         IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
         ipFilterHandler.addAll(rules);
-        logger.info(
-          "Adding ipFilter with " + ipFilterHandler.size() + " rules");
+        logger.info("Adding ipFilter with " + ipFilterHandler.size() + " rules");
 
         pipeline.addFirst("ipFilter", ipFilterHandler);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
index d2672b5..931e1e4 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
@@ -44,8 +44,7 @@ import com.google.common.base.Throwables;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class BasicSourceSemantics implements Source, Configurable {
-  private static final Logger logger = LoggerFactory
-      .getLogger(BasicSourceSemantics.class);
+  private static final Logger logger = LoggerFactory.getLogger(BasicSourceSemantics.class);
   private Exception exception;
   private ChannelProcessor channelProcessor;
   private String name;
@@ -54,9 +53,10 @@ public abstract class BasicSourceSemantics implements Source, Configurable {
   public BasicSourceSemantics() {
     lifecycleState = LifecycleState.IDLE;
   }
+
   @Override
   public synchronized void configure(Context context) {
-    if(isStarted()) {
+    if (isStarted()) {
       throw new IllegalStateException("Configure called when started");
     } else {
       try {
@@ -126,8 +126,7 @@ public abstract class BasicSourceSemantics implements Source, Configurable {
   }
 
   public String toString() {
-    return this.getClass().getName() + "{name:" + name + ",state:"
-        + lifecycleState +"}";
+    return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState + "}";
   }
 
   protected boolean isStarted() {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java b/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
index bb9d3f1..f2da332 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
@@ -46,13 +46,12 @@ public class DefaultSourceFactory implements SourceFactory {
       return source;
     } catch (Exception ex) {
       throw new FlumeException("Unable to create source: " + name
-          +", type: " + type + ", class: " + sourceClass.getName(), ex);
+          + ", type: " + type + ", class: " + sourceClass.getName(), ex);
     }
   }
   @SuppressWarnings("unchecked")
   @Override
-  public Class<? extends Source> getClass(String type)
-  throws FlumeException {
+  public Class<? extends Source> getClass(String type) throws FlumeException {
     String sourceClassName = type;
     SourceType srcType = SourceType.OTHER;
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 18e662c..52ea808 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -133,7 +133,8 @@ import java.nio.charset.Charset;
  * </tr>
  * <tr>
  * <td><tt>batchTimeout</tt></td>
- * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream.</td>
+ * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached,
+ *     before data is pushed downstream.</td>
  * <td>long</td>
  * <td>3000</td>
  * </tr>
@@ -145,11 +146,9 @@ import java.nio.charset.Charset;
  * TODO
  * </p>
  */
-public class ExecSource extends AbstractSource implements EventDrivenSource,
-Configurable {
+public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(ExecSource.class);
+  private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
 
   private String shell;
   private String command;
@@ -190,7 +189,7 @@ Configurable {
   @Override
   public void stop() {
     logger.info("Stopping exec source with command:{}", command);
-    if(runner != null) {
+    if (runner != null) {
       runner.setRestart(false);
       runner.kill();
     }
@@ -298,7 +297,7 @@ Configurable {
                 "timedFlushExecService" +
                 Thread.currentThread().getId() + "-%d").build());
         try {
-          if(shell != null) {
+          if (shell != null) {
             String[] commandArgs = formulateShellCommand(shell, command);
             process = Runtime.getRuntime().exec(commandArgs);
           }  else {
@@ -320,14 +319,14 @@ Configurable {
               public void run() {
                 try {
                   synchronized (eventList) {
-                    if(!eventList.isEmpty() && timeout()) {
+                    if (!eventList.isEmpty() && timeout()) {
                       flushEventBatch(eventList);
                     }
                   }
                 } catch (Exception e) {
                   logger.error("Exception occured when processing event batch", e);
-                  if(e instanceof InterruptedException) {
-                      Thread.currentThread().interrupt();
+                  if (e instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
                   }
                 }
               }
@@ -338,20 +337,20 @@ Configurable {
             synchronized (eventList) {
               sourceCounter.incrementEventReceivedCount();
               eventList.add(EventBuilder.withBody(line.getBytes(charset)));
-              if(eventList.size() >= bufferCount || timeout()) {
+              if (eventList.size() >= bufferCount || timeout()) {
                 flushEventBatch(eventList);
               }
             }
           }
 
           synchronized (eventList) {
-              if(!eventList.isEmpty()) {
-                flushEventBatch(eventList);
-              }
+            if (!eventList.isEmpty()) {
+              flushEventBatch(eventList);
+            }
           }
         } catch (Exception e) {
           logger.error("Failed while running command: " + command, e);
-          if(e instanceof InterruptedException) {
+          if (e instanceof InterruptedException) {
             Thread.currentThread().interrupt();
           }
         } finally {
@@ -364,7 +363,7 @@ Configurable {
           }
           exitCode = String.valueOf(kill());
         }
-        if(restart) {
+        if (restart) {
           logger.info("Restarting in {}ms, exit code {}", restartThrottle,
               exitCode);
           try {
@@ -375,17 +374,17 @@ Configurable {
         } else {
           logger.info("Command [" + command + "] exited with " + exitCode);
         }
-      } while(restart);
+      } while (restart);
     }
 
-    private void flushEventBatch(List<Event> eventList){
+    private void flushEventBatch(List<Event> eventList) {
       channelProcessor.processEventBatch(eventList);
       sourceCounter.addToEventAcceptedCount(eventList.size());
       eventList.clear();
       lastPushToChannel = systemClock.currentTimeMillis();
     }
 
-    private boolean timeout(){
+    private boolean timeout() {
       return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
     }
 
@@ -398,7 +397,7 @@ Configurable {
     }
 
     public int kill() {
-      if(process != null) {
+      if (process != null) {
         synchronized (process) {
           process.destroy();
 
@@ -407,7 +406,7 @@ Configurable {
 
             // Stop the Thread that flushes periodically
             if (future != null) {
-                future.cancel(true);
+              future.cancel(true);
             }
 
             if (timedFlushService != null) {
@@ -417,7 +416,7 @@ Configurable {
                   timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                   logger.debug("Interrupted while waiting for exec executor service "
-                    + "to stop. Just exiting.");
+                      + "to stop. Just exiting.");
                   Thread.currentThread().interrupt();
                 }
               }
@@ -435,6 +434,7 @@ Configurable {
       this.restart = restart;
     }
   }
+
   private static class StderrReader extends Thread {
     private BufferedReader input;
     private boolean logStderr;
@@ -449,8 +449,8 @@ Configurable {
       try {
         int i = 0;
         String line = null;
-        while((line = input.readLine()) != null) {
-          if(logStderr) {
+        while ((line = input.readLine()) != null) {
+          if (logStderr) {
             // There is no need to read 'line' with a charset
             // as we do not to propagate it.
             // It is in UTF-16 and would be printed in UTF-8 format.
@@ -461,7 +461,7 @@ Configurable {
         logger.info("StderrLogger exiting", e);
       } finally {
         try {
-          if(input != null) {
+          if (input != null) {
             input.close();
           }
         } catch (IOException ex) {


Mime
View raw message