flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [05/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes
Date Thu, 30 Jun 2016 02:21:31 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
index 957ec7f..ef1d51c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.source;
 
-
 public class ExecSourceConfigurationConstants {
 
   /**
@@ -50,7 +49,7 @@ public class ExecSourceConfigurationConstants {
    * to data is pushed downstream: : default 3000 ms
    */
   public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout";
-  public static final long DEFAULT_BATCH_TIME_OUT = 3000l;
+  public static final long DEFAULT_BATCH_TIME_OUT = 3000L;
 
   /**
    * Charset for reading input

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 87f0db1..b9f2438 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -113,7 +113,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     portCharsets.clear();
     {
       ImmutableMap<String, String> portCharsetCfg = context.getSubProperties(
-        SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX);
+          SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX);
       for (Map.Entry<String, String> entry : portCharsetCfg.entrySet()) {
         String portStr = entry.getKey();
         String charsetStr = entry.getValue();
@@ -386,7 +386,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
    */
   static class LineSplitter {
 
-    private final static byte NEWLINE = '\n';
+    private static final byte NEWLINE = '\n';
     private final int maxLineLength;
 
     public LineSplitter(int maxLineLength) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
index 1720d5f..f3efddb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
@@ -42,9 +42,9 @@ public class NetcatSourceConfigurationConstants {
   public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length";
   public static final int DEFAULT_MAX_LINE_LENGTH = 512;
 
- /**
-  * Encoding for the netcat source
-  */
+  /**
+   * Encoding for the netcat source
+   */
   public static final String CONFIG_SOURCE_ENCODING = "encoding";
   public static final String DEFAULT_ENCODING = "utf-8";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
index ea37703..7357793 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java
@@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory;
  */
 public class PollableSourceRunner extends SourceRunner {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(PollableSourceRunner.class);
+  private static final Logger logger = LoggerFactory.getLogger(PollableSourceRunner.class);
 
   private AtomicBoolean shouldStop;
 
@@ -94,10 +93,7 @@ public class PollableSourceRunner extends SourceRunner {
       runnerThread.interrupt();
       runnerThread.join();
     } catch (InterruptedException e) {
-      logger
-      .warn(
-          "Interrupted while waiting for polling runner to stop. Please report this.",
-          e);
+      logger.warn("Interrupted while waiting for polling runner to stop. Please report this.", e);
       Thread.currentThread().interrupt();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 1214635..9f831bd 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -71,7 +71,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements
     int i = 0;
     try {
       if (batchSize <= 1) {
-        if(eventsSent < totalEvents) {
+        if (eventsSent < totalEvents) {
           getChannelProcessor().processEvent(
                   EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
           sourceCounter.incrementEventAcceptedCount();
@@ -82,7 +82,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements
       } else {
         batchArrayList.clear();
         for (i = 0; i < batchSize; i++) {
-          if(eventsSent < totalEvents){
+          if (eventsSent < totalEvents) {
             batchArrayList.add(i, EventBuilder.withBody(String
                     .valueOf(sequence++).getBytes()));
             eventsSent++;
@@ -90,7 +90,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements
             status = Status.BACKOFF;
           }
         }
-        if(!batchArrayList.isEmpty()) {
+        if (!batchArrayList.isEmpty()) {
           getChannelProcessor().processEventBatch(batchArrayList);
           sourceCounter.incrementAppendBatchAcceptedCount();
           sourceCounter.addToEventAcceptedCount(batchArrayList.size());

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 3af3e53..d88cc1d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -20,7 +20,11 @@ package org.apache.flume.source;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SourceCounter;
@@ -39,11 +43,10 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
 
-public class SpoolDirectorySource extends AbstractSource implements
-Configurable, EventDrivenSource {
+public class SpoolDirectorySource extends AbstractSource
+                                  implements Configurable, EventDrivenSource {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(SpoolDirectorySource.class);
+  private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
 
   /* Config options */
   private String completedSuffix;
@@ -124,8 +127,7 @@ Configurable, EventDrivenSource {
 
     super.stop();
     sourceCounter.stop();
-    logger.info("SpoolDir source {} stopped. Metrics: {}", getName(),
-      sourceCounter);
+    logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), sourceCounter);
   }
 
   @Override
@@ -247,8 +249,8 @@ Configurable, EventDrivenSource {
             reader.commit();
           } catch (ChannelException ex) {
             logger.warn("The channel is full, and cannot write data now. The " +
-              "source will try again after " + String.valueOf(backoffInterval) +
-              " milliseconds");
+                "source will try again after " + String.valueOf(backoffInterval) +
+                " milliseconds");
             hitChannelException = true;
             if (backoff) {
               TimeUnit.MILLISECONDS.sleep(backoffInterval);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 32b7837..5859aa2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -91,6 +91,7 @@ public class SpoolDirectorySourceConfigurationConstants {
   public enum ConsumeOrder {
     OLDEST, YOUNGEST, RANDOM
   }
+
   public static final String CONSUME_ORDER = "consumeOrder";
   public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
index 9aa1477..aa95294 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
@@ -53,11 +53,9 @@ import org.slf4j.LoggerFactory;
  *
  * See {@link StressSource#configure(Context)} for configuration options.
  */
-public class StressSource extends AbstractPollableSource implements
-  Configurable {
+public class StressSource extends AbstractPollableSource implements Configurable {
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(StressSource.class);
+  private static final Logger logger = LoggerFactory.getLogger(StressSource.class);
 
   private CounterGroup counterGroup;
   private byte[] buffer;
@@ -102,8 +100,7 @@ public class StressSource extends AbstractPollableSource implements
       //Create event objects in case of batch test
       eventBatchList = new ArrayList<Event>();
 
-      for (int i = 0; i < batchSize; i++)
-      {
+      for (int i = 0; i < batchSize; i++) {
         eventBatchList.add(EventBuilder.withBody(buffer));
       }
     } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
index b57ffac..de727f6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
@@ -115,7 +115,7 @@ public class SyslogParser {
     // remember version string
     String version = null;
     if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) {
-      version = msg.substring(curPos, curPos+1);
+      version = msg.substring(curPos, curPos + 1);
       headers.put(SyslogUtils.SYSLOG_VERSION, version);
       curPos += 2;
     }
@@ -313,18 +313,19 @@ public class SyslogParser {
     try {
       date = rfc3164Format.parseDateTime(ts);
     } catch (IllegalArgumentException e) {
-      logger.debug("rfc3164 date parse failed on ("+ts+"): invalid format", e);
+      logger.debug("rfc3164 date parse failed on (" + ts + "): invalid format", e);
       return 0;
     }
 
     // rfc3164 dates are really dumb.
     /*
-     * Some code to try and add some smarts to the year insertion as without a year in the message we
-     * need to make some educated guessing.
+     * Some code to try and add some smarts to the year insertion as without a year in the message
+     * we need to make some educated guessing.
      * First set the "fixed" to be the timestamp with the current year.
      * If the "fixed" time is more than one month in the future then roll it back a year.
      * If the "fixed" time is more than eleven months in the past then roll it forward a year.
-     * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of timestamps.
+     * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of
+     * timestamps.
      */
 
     if (date != null) {
@@ -332,7 +333,7 @@ public class SyslogParser {
 
       // flume clock is ahead or there is some latency, and the year rolled
       if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
-         fixed = date.minusYears(1);
+        fixed = date.minusYears(1);
       // flume clock is behind and the year rolled
       } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
         fixed = date.plusYears(1);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index bd87151..185c00c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -48,11 +48,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyslogTcpSource extends AbstractSource
-implements EventDrivenSource, Configurable {
+                             implements EventDrivenSource, Configurable {
+  private static final Logger logger = LoggerFactory.getLogger(SyslogTcpSource.class);
 
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(SyslogTcpSource.class);
   private int port;
   private String host = null;
   private Channel nettyChannel;
@@ -65,7 +63,7 @@ implements EventDrivenSource, Configurable {
 
     private SyslogUtils syslogUtils = new SyslogUtils();
 
-    public void setEventSize(int eventSize){
+    public void setEventSize(int eventSize) {
       syslogUtils.setEventSize(eventSize);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index 47993dd..175bebb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -35,14 +35,21 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SyslogUDPSource extends AbstractSource
-      implements EventDrivenSource, Configurable {
+                             implements EventDrivenSource, Configurable {
 
   private int port;
   private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426
@@ -51,8 +58,7 @@ public class SyslogUDPSource extends AbstractSource
   private Map<String, String> formaterProp;
   private Set<String> keepFields;
 
-  private static final Logger logger = LoggerFactory
-      .getLogger(SyslogUDPSource.class);
+  private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class);
 
   private CounterGroup counterGroup = new CounterGroup();
 
@@ -96,20 +102,20 @@ public class SyslogUDPSource extends AbstractSource
   @Override
   public void start() {
     // setup Netty server
-    ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
-        (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
+    ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(
+        new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
     final syslogHandler handler = new syslogHandler();
     handler.setFormater(formaterProp);
     handler.setKeepFields(keepFields);
     serverBootstrap.setOption("receiveBufferSizePredictorFactory",
-      new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE,
-        DEFAULT_INITIAL_SIZE, maxsize));
+        new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE,
+            DEFAULT_INITIAL_SIZE, maxsize));
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-       return Channels.pipeline(handler);
+        return Channels.pipeline(handler);
       }
-     });
+    });
 
     if (host == null) {
       nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
index 4866183..43a10e1 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
@@ -36,7 +36,6 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -47,38 +46,38 @@ import java.util.regex.Pattern;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class SyslogUtils {
-  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
-  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
-  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ";
-  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss";
-  final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
+  public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ";
+  public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S";
+  public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ";
+  public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss";
+  public static final String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss";
 
-  final public static String SYSLOG_MSG_RFC5424_0 =
+  public static final String SYSLOG_MSG_RFC5424_0 =
       "(?:\\<(\\d{1,3})\\>)" + // priority
-      "(?:(\\d?)\\s?)" + // version
-      /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
-      "(?:" +
-        "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
-        "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
-      "\\s" + // separator
-      "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
-      "\\s" + // separator
-      "(.*)$"; // body
-
-  final public static String SYSLOG_MSG_RFC3164_0 =
+          "(?:(\\d?)\\s?)" + // version
+          /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
+          "(?:" +
+          "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
+          "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
+          "\\s" + // separator
+          "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
+          "\\s" + // separator
+          "(.*)$"; // body
+
+  public static final String SYSLOG_MSG_RFC3164_0 =
       "(?:\\<(\\d{1,3})\\>)" +
-      "(?:(\\d)?\\s?)" + // version
-      // stamp MMM d HH:mm:ss, single digit date has two spaces
-      "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
-      "\\s" + // separator
-      "([\\w][\\w\\d\\.@-]*)" + // host
-      "\\s(.*)$";  // body
-
-  final public static int SYSLOG_PRIORITY_POS = 1;
-  final public static int SYSLOG_VERSION_POS = 2;
-  final public static int SYSLOG_TIMESTAMP_POS = 3;
-  final public static int SYSLOG_HOSTNAME_POS = 4;
-  final public static int SYSLOG_BODY_POS = 5;
+          "(?:(\\d)?\\s?)" + // version
+          // stamp MMM d HH:mm:ss, single digit date has two spaces
+          "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
+          "\\s" + // separator
+          "([\\w][\\w\\d\\.@-]*)" + // host
+          "\\s(.*)$";  // body
+
+  public static final int SYSLOG_PRIORITY_POS = 1;
+  public static final int SYSLOG_VERSION_POS = 2;
+  public static final int SYSLOG_TIMESTAMP_POS = 3;
+  public static final int SYSLOG_HOSTNAME_POS = 4;
+  public static final int SYSLOG_BODY_POS = 5;
 
   private Mode m = Mode.START;
   private StringBuilder prio = new StringBuilder();
@@ -86,13 +85,13 @@ public class SyslogUtils {
   private static final Logger logger = LoggerFactory
       .getLogger(SyslogUtils.class);
 
-  final public static String SYSLOG_FACILITY = "Facility";
-  final public static String SYSLOG_SEVERITY = "Severity";
-  final public static String SYSLOG_PRIORITY = "Priority";
-  final public static String SYSLOG_VERSION = "Version";
-  final public static String EVENT_STATUS = "flume.syslog.status";
-  final public static Integer MIN_SIZE = 10;
-  final public static Integer DEFAULT_SIZE = 2500;
+  public static final String SYSLOG_FACILITY = "Facility";
+  public static final String SYSLOG_SEVERITY = "Severity";
+  public static final String SYSLOG_PRIORITY = "Priority";
+  public static final String SYSLOG_VERSION = "Version";
+  public static final String EVENT_STATUS = "flume.syslog.status";
+  public static final Integer MIN_SIZE = 10;
+  public static final Integer DEFAULT_SIZE = 2500;
   private final boolean isUdp;
   private boolean isBadEvent;
   private boolean isIncompleteEvent;
@@ -106,6 +105,7 @@ public class SyslogUtils {
     public ArrayList<SimpleDateFormat> dateFormat = new ArrayList<SimpleDateFormat>();
     public boolean addYear;
   }
+
   private ArrayList<SyslogFormatter> formats = new ArrayList<SyslogFormatter>();
 
   private String priority = null;
@@ -115,10 +115,10 @@ public class SyslogUtils {
   private String msgBody = null;
 
   private static final String[] DEFAULT_FIELDS_TO_KEEP = {
-    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY,
-    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION,
-    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP,
-    SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME
+      SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY,
+      SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION,
+      SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP,
+      SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME
   };
   public static final String KEEP_FIELDS_ALL = "--all--";
 
@@ -211,22 +211,19 @@ public class SyslogUtils {
       return;
     }
     SyslogFormatter fmt1 = new SyslogFormatter();
-    fmt1.regexPattern = Pattern.compile( formatProp.get(
-        SyslogSourceConfigurationConstants.CONFIG_REGEX) );
-    if (formatProp.containsKey(
-        SyslogSourceConfigurationConstants.CONFIG_SEARCH)) {
-      fmt1.searchPattern.add(formatProp.get(
-          SyslogSourceConfigurationConstants.CONFIG_SEARCH));
+    fmt1.regexPattern = Pattern.compile(
+        formatProp.get(SyslogSourceConfigurationConstants.CONFIG_REGEX));
+    if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_SEARCH)) {
+      fmt1.searchPattern.add(
+          formatProp.get(SyslogSourceConfigurationConstants.CONFIG_SEARCH));
     }
-    if (formatProp.containsKey(
-        SyslogSourceConfigurationConstants.CONFIG_REPLACE)) {
-      fmt1.replacePattern.add(formatProp.get(
-          SyslogSourceConfigurationConstants.CONFIG_REPLACE));
+    if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_REPLACE)) {
+      fmt1.replacePattern.add(
+          formatProp.get(SyslogSourceConfigurationConstants.CONFIG_REPLACE));
     }
-    if (formatProp.containsKey(
-        SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) {
-        fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get(
-            SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)));
+    if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) {
+      fmt1.dateFormat.add(new SimpleDateFormat(
+          formatProp.get(SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)));
     }
     formats.add(0, fmt1);
   }
@@ -266,20 +263,22 @@ public class SyslogUtils {
 
   enum Mode {
     START, PRIO, DATA
-  };
+  }
 
-  public enum SyslogStatus{
+  ;
+
+  public enum SyslogStatus {
     OTHER("Unknown"),
     INVALID("Invalid"),
     INCOMPLETE("Incomplete");
 
     private final String syslogStatus;
 
-    private SyslogStatus(String status){
+    private SyslogStatus(String status) {
       syslogStatus = status;
     }
 
-    public String getSyslogStatus(){
+    public String getSyslogStatus() {
       return this.syslogStatus;
     }
   }
@@ -292,14 +291,14 @@ public class SyslogUtils {
       int sev = 0;
       int facility = 0;
 
-      if(!isBadEvent){
+      if (!isBadEvent) {
         pri = Integer.parseInt(prio.toString());
         sev = pri % 8;
         facility = pri / 8;
         formatHeaders();
       }
 
-      Map <String, String> headers = new HashMap<String, String>();
+      Map<String, String> headers = new HashMap<String, String>();
       headers.put(SYSLOG_FACILITY, String.valueOf(facility));
       headers.put(SYSLOG_SEVERITY, String.valueOf(sev));
       if ((priority != null) && (priority.length() > 0)) {
@@ -314,10 +313,10 @@ public class SyslogUtils {
       if ((hostName != null) && (hostName.length() > 0)) {
         headers.put("host", hostName);
       }
-      if(isBadEvent){
+      if (isBadEvent) {
         logger.warn("Event created from Invalid Syslog data.");
         headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus());
-      } else if(isIncompleteEvent){
+      } else if (isIncompleteEvent) {
         logger.warn("Event size larger than specified event size: {}. You should " +
             "consider increasing your event size.", maxSize);
         headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus());
@@ -345,22 +344,22 @@ public class SyslogUtils {
     String eventStr = baos.toString();
     String timeStampString = null;
 
-    for(int p=0; p < formats.size(); p++) {
+    for (int p = 0; p < formats.size(); p++) {
       SyslogFormatter fmt = formats.get(p);
       Pattern pattern = fmt.regexPattern;
       Matcher matcher = pattern.matcher(eventStr);
-      if (! matcher.matches()) {
+      if (!matcher.matches()) {
         continue;
       }
       MatchResult res = matcher.toMatchResult();
-      for (int grp=1; grp <= res.groupCount(); grp++) {
+      for (int grp = 1; grp <= res.groupCount(); grp++) {
         String value = res.group(grp);
         if (grp == SYSLOG_TIMESTAMP_POS) {
           timeStampString = value;
 
           // apply available format replacements to timestamp
           if (value != null) {
-            for (int sp=0; sp < fmt.searchPattern.size(); sp++) {
+            for (int sp = 0; sp < fmt.searchPattern.size(); sp++) {
               value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp));
             }
             // Add year to timestamp if needed
@@ -373,14 +372,16 @@ public class SyslogUtils {
                 Date parsedDate = fmt.dateFormat.get(dt).parse(value);
                 /*
                  * Some code to try and add some smarts to the year insertion.
-                 * Original code just added the current year which was okay-ish, but around January 1st becomes
-                 * pretty naïve.
-                 * The current year is added above. This code, if the year has been added does the following:
+                 * Original code just added the current year which was okay-ish, but around
+                 * January 1st becomes pretty naïve.
+                 * The current year is added above. This code, if the year has been added does
+                 * the following:
                  * 1. Compute what the computed time, but one month in the past would be.
                  * 2. Compute what the computed time, but eleven months in the future would be.
-                 * If the computed time is more than one month in the future then roll it back a year.
-                 * If the computed time is more than eleven months in the past then roll it forward a year.
-                 * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of timestamps.
+                 * If the computed time is more than one month in the future then roll it back a
+                 * year. If the computed time is more than eleven months in the past then roll it
+                 * forward a year. This gives us a 12 month rolling window (11 months in the past,
+                 * 1 month in the future) of timestamps.
                  */
                 if (fmt.addYear) {
                   Calendar cal = Calendar.getInstance();
@@ -393,13 +394,15 @@ public class SyslogUtils {
                   calPlusElevenMonths.setTime(parsedDate);
                   calPlusElevenMonths.add(Calendar.MONTH, +11);
 
-                  if (cal.getTimeInMillis() > System.currentTimeMillis() && calMinusOneMonth.getTimeInMillis() > System.currentTimeMillis()) {
+                  if (cal.getTimeInMillis() > System.currentTimeMillis() &&
+                      calMinusOneMonth.getTimeInMillis() > System.currentTimeMillis()) {
                     //Need to roll back a year
                     Calendar c1 = Calendar.getInstance();
                     c1.setTime(parsedDate);
                     c1.add(Calendar.YEAR, -1);
                     parsedDate = c1.getTime();
-                  } else if (cal.getTimeInMillis() < System.currentTimeMillis() && calPlusElevenMonths.getTimeInMillis() < System.currentTimeMillis() ) {
+                  } else if (cal.getTimeInMillis() < System.currentTimeMillis() &&
+                             calPlusElevenMonths.getTimeInMillis() < System.currentTimeMillis()) {
                     //Need to roll forward a year
                     Calendar c1 = Calendar.getInstance();
                     c1.setTime(parsedDate);
@@ -422,14 +425,15 @@ public class SyslogUtils {
         } else if (grp == SYSLOG_VERSION_POS) {
           version = value;
         } else if (grp == SYSLOG_BODY_POS) {
-          msgBody = addFieldsToBody(keepFields, value, priority, version, timeStampString, hostName);
+          msgBody = addFieldsToBody(keepFields, value, priority, version,
+                                    timeStampString, hostName);
         }
       }
       break; // we successfully parsed the message using this pattern
     }
   }
 
-  private void reset(){
+  private void reset() {
     baos.reset();
     m = Mode.START;
     prio.delete(0, prio.length());
@@ -441,7 +445,7 @@ public class SyslogUtils {
   }
 
   // extract relevant syslog data needed for building Flume event
-  public Event extractEvent(ChannelBuffer in){
+  public Event extractEvent(ChannelBuffer in) {
 
     /* for protocol debugging
     ByteBuffer bb = in.toByteBuffer();
@@ -459,61 +463,61 @@ public class SyslogUtils {
       while (!doneReading && in.readable()) {
         b = in.readByte();
         switch (m) {
-        case START:
-          if (b == '<') {
-            baos.write(b);
-            m = Mode.PRIO;
-          } else if(b == '\n'){
-            //If the character is \n, it was because the last event was exactly
-            //as long  as the maximum size allowed and
-            //the only remaining character was the delimiter - '\n', or
-            //multiple delimiters were sent in a row.
-            //Just ignore it, and move forward, don't change the mode.
-            //This is a no-op, just ignore it.
-            logger.debug("Delimiter found while in START mode, ignoring..");
-
-          } else {
-            isBadEvent = true;
-            baos.write(b);
-            //Bad event, just dump everything as if it is data.
-            m = Mode.DATA;
-          }
-          break;
-        case PRIO:
-          baos.write(b);
-          if (b == '>') {
-            if (prio.length() == 0) {
-              isBadEvent = true;
-            }
-            m = Mode.DATA;
-          } else {
-            char ch = (char) b;
-            prio.append(ch);
-            // Priority is max 3 digits per both RFC 3164 and 5424
-            // With this check there is basically no danger of
-            // boas.size() exceeding this.maxSize before getting to the
-            // DATA state where this is actually checked
-            if (!Character.isDigit(ch) || prio.length() > 3) {
+          case START:
+            if (b == '<') {
+              baos.write(b);
+              m = Mode.PRIO;
+            } else if (b == '\n') {
+              //If the character is \n, it was because the last event was exactly
+              //as long  as the maximum size allowed and
+              //the only remaining character was the delimiter - '\n', or
+              //multiple delimiters were sent in a row.
+              //Just ignore it, and move forward, don't change the mode.
+              //This is a no-op, just ignore it.
+              logger.debug("Delimiter found while in START mode, ignoring..");
+
+            } else {
               isBadEvent = true;
-              //If we hit a bad priority, just write as if everything is data.
+              baos.write(b);
+              //Bad event, just dump everything as if it is data.
               m = Mode.DATA;
             }
-          }
-          break;
-        case DATA:
-          // TCP syslog entries are separated by '\n'
-          if (b == '\n') {
-            e = buildEvent();
-            doneReading = true;
-          } else {
+            break;
+          case PRIO:
             baos.write(b);
-          }
-          if(baos.size() == this.maxSize && !doneReading) {
-            isIncompleteEvent = true;
-            e = buildEvent();
-            doneReading = true;
-          }
-          break;
+            if (b == '>') {
+              if (prio.length() == 0) {
+                isBadEvent = true;
+              }
+              m = Mode.DATA;
+            } else {
+              char ch = (char) b;
+              prio.append(ch);
+              // Priority is max 3 digits per both RFC 3164 and 5424
+              // With this check there is basically no danger of
+              // boas.size() exceeding this.maxSize before getting to the
+              // DATA state where this is actually checked
+              if (!Character.isDigit(ch) || prio.length() > 3) {
+                isBadEvent = true;
+                //If we hit a bad priority, just write as if everything is data.
+                m = Mode.DATA;
+              }
+            }
+            break;
+          case DATA:
+            // TCP syslog entries are separated by '\n'
+            if (b == '\n') {
+              e = buildEvent();
+              doneReading = true;
+            } else {
+              baos.write(b);
+            }
+            if (baos.size() == this.maxSize && !doneReading) {
+              isIncompleteEvent = true;
+              e = buildEvent();
+              doneReading = true;
+            }
+            break;
         }
 
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 7df5ddb..6a25e64 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -75,11 +75,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.security.PrivilegedAction;
 
-public class ThriftSource extends AbstractSource implements Configurable,
-  EventDrivenSource {
+public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource {
+
+  public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
 
-  public static final Logger logger = LoggerFactory.getLogger(ThriftSource
-    .class);
   /**
    * Config param for the maximum number of threads this source should use to
    * handle incoming data.
@@ -131,17 +130,17 @@ public class ThriftSource extends AbstractSource implements Configurable,
     logger.info("Configuring thrift source.");
     port = context.getInteger(CONFIG_PORT);
     Preconditions.checkNotNull(port, "Port must be specified for Thrift " +
-      "Source.");
+        "Source.");
     bindAddress = context.getString(CONFIG_BIND);
     Preconditions.checkNotNull(bindAddress, "Bind address must be specified " +
-      "for Thrift Source.");
+        "for Thrift Source.");
 
     try {
       maxThreads = context.getInteger(CONFIG_THREADS, 0);
       maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads;
     } catch (NumberFormatException e) {
       logger.warn("Thrift source\'s \"threads\" property must specify an " +
-        "integer value: " + context.getString(CONFIG_THREADS));
+                  "integer value: " + context.getString(CONFIG_THREADS));
     }
 
     if (sourceCounter == null) {
@@ -190,8 +189,8 @@ public class ThriftSource extends AbstractSource implements Configurable,
     String keytab = context.getString(AGENT_KEYTAB);
     enableKerberos = context.getBoolean(KERBEROS_KEY, false);
     this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab);
-    if(enableKerberos) {
-      if(!flumeAuth.isAuthenticated()) {
+    if (enableKerberos) {
+      if (!flumeAuth.isAuthenticated()) {
         throw new FlumeException("Authentication failed in Kerberos mode for " +
                 "principal " + principal + " keytab " + keytab);
       }
@@ -221,29 +220,27 @@ public class ThriftSource extends AbstractSource implements Configurable,
     servingExecutor.submit(new Runnable() {
       @Override
       public void run() {
-        flumeAuth.execute(
-          new PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-              server.serve();
-              return null;
-            }
+        flumeAuth.execute(new PrivilegedAction<Object>() {
+          @Override
+          public Object run() {
+            server.serve();
+            return null;
           }
-        );
+        });
       }
     });
 
     long timeAfterStart = System.currentTimeMillis();
-    while(!server.isServing()) {
+    while (!server.isServing()) {
       try {
-        if(System.currentTimeMillis() - timeAfterStart >=10000) {
+        if (System.currentTimeMillis() - timeAfterStart >= 10000) {
           throw new FlumeException("Thrift server failed to start!");
         }
         TimeUnit.MILLISECONDS.sleep(1000);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new FlumeException("Interrupted while waiting for Thrift server" +
-          " to start.", e);
+            " to start.", e);
       }
     }
     sourceCounter.start();
@@ -287,8 +284,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
 
   private TServerTransport getTServerTransport() {
     try {
-      return new TServerSocket(new InetSocketAddress
-              (bindAddress, port));
+      return new TServerSocket(new InetSocketAddress(bindAddress, port));
     } catch (Throwable throwable) {
       throw new FlumeException("Cannot start Thrift source.", throwable);
     }
@@ -305,7 +301,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
   }
 
   private TServer getTThreadedSelectorServer() {
-    if(enableSsl || enableKerberos) {
+    if (enableSsl || enableKerberos) {
       return null;
     }
     Class<?> serverClass;
@@ -345,7 +341,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
        *
        */
       server = (TServer) serverClass.getConstructor(argsClass).newInstance(args);
-    } catch(ClassNotFoundException e) {
+    } catch (ClassNotFoundException e) {
       return null;
     } catch (Throwable ex) {
       throw new FlumeException("Cannot start Thrift Source.", ex);
@@ -371,7 +367,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
     args.protocolFactory(getProtocolFactory());
 
     //populate the transportFactory
-    if(enableKerberos) {
+    if (enableKerberos) {
       args.transportFactory(getSASLTransportFactory());
     } else {
       args.transportFactory(new TFastFramedTransport.Factory());
@@ -402,7 +398,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
 
   @Override
   public void stop() {
-    if(server != null && server.isServing()) {
+    if (server != null && server.isServing()) {
       server.stop();
     }
     if (servingExecutor != null) {
@@ -424,8 +420,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
 
     @Override
     public Status append(ThriftFlumeEvent event) throws TException {
-      Event flumeEvent = EventBuilder.withBody(event.getBody(),
-        event.getHeaders());
+      Event flumeEvent = EventBuilder.withBody(event.getBody(), event.getHeaders());
 
       sourceCounter.incrementAppendReceivedCount();
       sourceCounter.incrementEventReceivedCount();
@@ -434,7 +429,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
         getChannelProcessor().processEvent(flumeEvent);
       } catch (ChannelException ex) {
         logger.warn("Thrift source " + getName() + " could not append events " +
-          "to the channel.", ex);
+                    "to the channel.", ex);
         return Status.FAILED;
       }
       sourceCounter.incrementAppendAcceptedCount();
@@ -448,16 +443,14 @@ public class ThriftSource extends AbstractSource implements Configurable,
       sourceCounter.addToEventReceivedCount(events.size());
 
       List<Event> flumeEvents = Lists.newArrayList();
-      for(ThriftFlumeEvent event : events) {
-        flumeEvents.add(EventBuilder.withBody(event.getBody(),
-          event.getHeaders()));
+      for (ThriftFlumeEvent event : events) {
+        flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
       }
 
       try {
         getChannelProcessor().processEventBatch(flumeEvents);
       } catch (ChannelException ex) {
-        logger.warn("Thrift source %s could not append events to the " +
-          "channel.", getName());
+        logger.warn("Thrift source %s could not append events to the channel.", getName());
         return Status.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
index a816363..e24d4c6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
@@ -77,7 +77,7 @@ public class BLOBHandler implements HTTPSourceHandler {
     }
 
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    try{
+    try {
       IOUtils.copy(inputStream, outputStream);
       LOG.debug("Building an Event with stream of size -- {}", outputStream.size());
       Event event = EventBuilder.withBody(outputStream.toByteArray(), headers);
@@ -85,8 +85,7 @@ public class BLOBHandler implements HTTPSourceHandler {
       List<Event> eventList = new ArrayList<Event>();
       eventList.add(event);
       return eventList;
-    }
-    finally {
+    } finally {
       outputStream.close();
       inputStream.close();
     }
@@ -94,7 +93,8 @@ public class BLOBHandler implements HTTPSourceHandler {
 
   @Override
   public void configure(Context context) {
-    this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, DEFAULT_MANDATORY_PARAMETERS);
+    this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS,
+                                                   DEFAULT_MANDATORY_PARAMETERS);
     this.mandatoryHeaders = commaSeparatedHeaders.split(PARAMETER_SEPARATOR);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index b520b03..38bdfda 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -41,7 +41,12 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.net.ServerSocket;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A source which accepts Flume Events by HTTP POST and GET. GET should be used
@@ -104,27 +109,28 @@ public class HTTPSource extends AbstractSource implements
 
       port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
       host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
-        HTTPSourceConfigurationConstants.DEFAULT_BIND);
+          HTTPSourceConfigurationConstants.DEFAULT_BIND);
 
       Preconditions.checkState(host != null && !host.isEmpty(),
                 "HTTPSource hostname specified is empty");
       Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
-        + " specified");
+          + " specified");
 
       String handlerClassName = context.getString(
               HTTPSourceConfigurationConstants.CONFIG_HANDLER,
               HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();
 
-      if(sslEnabled) {
+      if (sslEnabled) {
         LOG.debug("SSL configuration enabled");
         keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);
         Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
-                                        "Keystore is required for SSL Conifguration" );
-        keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
+                                    "Keystore is required for SSL Conifguration" );
+        keyStorePassword =
+            context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
         Preconditions.checkArgument(keyStorePassword != null,
-          "Keystore password is required for SSL Configuration");
-        String excludeProtocolsStr = context.getString(HTTPSourceConfigurationConstants
-          .EXCLUDE_PROTOCOLS);
+            "Keystore password is required for SSL Configuration");
+        String excludeProtocolsStr =
+            context.getString(HTTPSourceConfigurationConstants.EXCLUDE_PROTOCOLS);
         if (excludeProtocolsStr == null) {
           excludedProtocols.add("SSLv3");
         } else {
@@ -166,9 +172,9 @@ public class HTTPSource extends AbstractSource implements
 
   private void checkHostAndPort() {
     Preconditions.checkState(host != null && !host.isEmpty(),
-      "HTTPSource hostname specified is empty");
+        "HTTPSource hostname specified is empty");
     Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
-      + " specified");
+        + " specified");
   }
 
   @Override
@@ -199,8 +205,7 @@ public class HTTPSource extends AbstractSource implements
     connectors[0].setPort(port);
     srv.setConnectors(connectors);
     try {
-      org.mortbay.jetty.servlet.Context root =
-        new org.mortbay.jetty.servlet.Context(
+      org.mortbay.jetty.servlet.Context root = new org.mortbay.jetty.servlet.Context(
           srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
       root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
       HTTPServerConstraintUtil.enforceConstraints(root);
@@ -285,26 +290,23 @@ public class HTTPSource extends AbstractSource implements
   }
 
   private static class HTTPSourceSocketConnector extends SslSocketConnector {
-
     private final List<String> excludedProtocols;
+
     HTTPSourceSocketConnector(List<String> excludedProtocols) {
       this.excludedProtocols = excludedProtocols;
     }
 
     @Override
-    public ServerSocket newServerSocket(String host, int port,
-      int backlog) throws IOException {
-      SSLServerSocket socket = (SSLServerSocket)super.newServerSocket(host,
-        port, backlog);
+    public ServerSocket newServerSocket(String host, int port, int backlog) throws IOException {
+      SSLServerSocket socket = (SSLServerSocket)super.newServerSocket(host, port, backlog);
       String[] protocols = socket.getEnabledProtocols();
       List<String> newProtocols = new ArrayList<String>(protocols.length);
-      for(String protocol: protocols) {
+      for (String protocol: protocols) {
         if (!excludedProtocols.contains(protocol)) {
           newProtocols.add(protocol);
         }
       }
-      socket.setEnabledProtocols(
-        newProtocols.toArray(new String[newProtocols.size()]));
+      socket.setEnabledProtocols(newProtocols.toArray(new String[newProtocols.size()]));
       return socket;
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
index 197f66a..c99eb18 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
@@ -35,13 +35,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
  * JSONHandler for HTTPSource that accepts an array of events.
  *
  * This handler throws exception if the deserialization fails because of bad
  * format or any other reason.
  *
- *
  * Each event must be encoded as a map with two key-value pairs. <p> 1. headers
  * - the key for this key-value pair is "headers". The value for this key is
  * another map, which represent the event headers. These headers are inserted
@@ -69,17 +67,15 @@ import org.slf4j.LoggerFactory;
  * {@linkplain Gson#toJson(java.lang.Object, java.lang.reflect.Type) }
  * method. The type token to pass as the 2nd argument of this method
  * for list of events can be created by: <p>
- *
- * Type type = new TypeToken<List<JSONEvent>>() {}.getType(); <p>
- *
+ * {@code
+ * Type type = new TypeToken<List<JSONEvent>>() {}.getType();
+ * }
  */
 
 public class JSONHandler implements HTTPSourceHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);
-  private final Type listType =
-          new TypeToken<List<JSONEvent>>() {
-          }.getType();
+  private final Type listType = new TypeToken<List<JSONEvent>>() {}.getType();
   private final Gson gson;
 
   public JSONHandler() {
@@ -131,7 +127,7 @@ public class JSONHandler implements HTTPSourceHandler {
 
   private List<Event> getSimpleEvents(List<Event> events) {
     List<Event> newEvents = new ArrayList<Event>(events.size());
-    for(Event e:events) {
+    for (Event e:events) {
       newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));
     }
     return newEvents;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java b/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java
index 8c2db2c..dfa2229 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java
@@ -54,7 +54,7 @@ public class DirectMemoryUtils {
       ByteBuffer result = ByteBuffer.allocateDirect(size);
       allocated.addAndGet(size);
       return result;
-    } catch(OutOfMemoryError error) {
+    } catch (OutOfMemoryError error) {
       LOG.error("Error allocating " + size + ", you likely want" +
           " to increase " + MAX_DIRECT_MEMORY_PARAM, error);
       throw error;
@@ -88,11 +88,9 @@ public class DirectMemoryUtils {
 
         if (memSize.contains("k")) {
           multiplier = 1024;
-        }
-        else if (memSize.contains("m")) {
+        } else if (memSize.contains("m")) {
           multiplier = 1048576;
-        }
-        else if (memSize.contains("g")) {
+        } else if (memSize.contains("g")) {
           multiplier = 1073741824;
         }
         memSize = memSize.replaceAll("[^\\d]", "");
@@ -107,7 +105,7 @@ public class DirectMemoryUtils {
       Class<?> VM = Class.forName("sun.misc.VM");
       Method maxDirectMemory = VM.getDeclaredMethod("maxDirectMemory", (Class<?>)null);
       Object result = maxDirectMemory.invoke(null, (Object[])null);
-      if(result != null && result instanceof Long) {
+      if (result != null && result instanceof Long) {
         return (Long)result;
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java b/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java
index 5d0ea74..bc073bb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java
@@ -21,7 +21,7 @@ package org.apache.flume.tools;
  * A generic way for querying Java properties.
  */
 public class GetJavaProperty {
-  public static void main(String args[]) {
+  public static void main(String[] args) {
     if (args.length == 0) {
       for (Object prop : System.getProperties().keySet()) {
         System.out.println(prop + "=" + System.getProperty((String)prop, ""));

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java b/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
index 3a59953..daa9606 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java
@@ -35,7 +35,7 @@ public class TimestampRoundDownUtil {
    */
   public static long roundDownTimeStampSeconds(long timestamp,
       int roundDownSec) throws IllegalStateException {
-    Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <=60,
+    Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <= 60,
         "RoundDownSec must be > 0 and <=60");
     Calendar cal = roundDownField(timestamp, Calendar.SECOND, roundDownSec);
     cal.set(Calendar.MILLISECOND, 0);
@@ -53,7 +53,7 @@ public class TimestampRoundDownUtil {
    */
   public static long roundDownTimeStampMinutes(long timestamp,
       int roundDownMins) throws IllegalStateException {
-    Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <=60,
+    Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <= 60,
         "RoundDown must be > 0 and <=60");
     Calendar cal = roundDownField(timestamp, Calendar.MINUTE, roundDownMins);
     cal.set(Calendar.SECOND, 0);
@@ -73,7 +73,7 @@ public class TimestampRoundDownUtil {
    */
   public static long roundDownTimeStampHours(long timestamp,
       int roundDownHours) throws IllegalStateException {
-    Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <=24,
+    Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <= 24,
         "RoundDown must be > 0 and <=24");
     Calendar cal = roundDownField(timestamp,
         Calendar.HOUR_OF_DAY, roundDownHours);
@@ -83,8 +83,7 @@ public class TimestampRoundDownUtil {
     return cal.getTimeInMillis();
   }
 
-  private static Calendar roundDownField(
-      long timestamp, int field, int roundDown){
+  private static Calendar roundDownField(long timestamp, int field, int roundDown) {
     Preconditions.checkArgument(timestamp > 0, "Timestamp must be positive");
     Calendar cal = Calendar.getInstance();
     cal.setTimeInMillis(timestamp);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java
index c12cf8d..95aa29d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java
@@ -54,9 +54,9 @@ public class VersionInfo {
    * @return the revision number, eg. "100755"
    */
   public static String getRevision() {
-    if(version != null
-            && version.revision() != null
-            && !version.revision().isEmpty()){
+    if (version != null
+        && version.revision() != null
+        && !version.revision().isEmpty()) {
       return version.revision();
     }
     return "Unknown";
@@ -105,12 +105,12 @@ public class VersionInfo {
    * Returns the build version info which includes version,
    * revision, user, date and source checksum
    */
-  public static String getBuildVersion(){
+  public static String getBuildVersion() {
     return VersionInfo.getVersion() +
-    " from " + VersionInfo.getRevision() +
-    " by " + VersionInfo.getUser() +
-    " on " + VersionInfo.getDate() +
-    " source checksum " + VersionInfo.getSrcChecksum();
+        " from " + VersionInfo.getRevision() +
+        " by " + VersionInfo.getUser() +
+        " on " + VersionInfo.getDate() +
+        " source checksum " + VersionInfo.getSrcChecksum();
   }
 
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index 32c9f18..ad3e138 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -76,9 +76,11 @@ public class EmbeddedAgent {
     supervisor = new LifecycleSupervisor();
 
   }
+
   public EmbeddedAgent(String name) {
     this(new MaterializedConfigurationProvider(), name);
   }
+
   /**
    * Configures the embedded agent. Can only be called after the object
    * is created or after the stop() method is called.
@@ -89,12 +91,13 @@ public class EmbeddedAgent {
    */
   public void configure(Map<String, String> properties)
       throws FlumeException {
-    if(state == State.STARTED) {
+    if (state == State.STARTED) {
       throw new IllegalStateException("Cannot be configured while started");
     }
     doConfigure(properties);
     state = State.STOPPED;
   }
+
   /**
    * Started the agent. Can only be called after a successful call to
    * configure().
@@ -105,9 +108,9 @@ public class EmbeddedAgent {
    */
   public void start()
       throws FlumeException {
-    if(state == State.STARTED) {
+    if (state == State.STARTED) {
       throw new IllegalStateException("Cannot be started while started");
-    } else if(state == State.NEW) {
+    } else if (state == State.NEW) {
       throw new IllegalStateException("Cannot be started before being " +
           "configured");
     }
@@ -115,15 +118,15 @@ public class EmbeddedAgent {
     // as doStart() accesses sourceRunner.getSource()
     Source source = Preconditions.checkNotNull(sourceRunner.getSource(),
         "Source runner returned null source");
-    if(source instanceof EmbeddedSource) {
+    if (source instanceof EmbeddedSource) {
       embeddedSource = (EmbeddedSource)source;
     } else {
-      throw new IllegalStateException("Unknown source type: " + source.
-          getClass().getName());
+      throw new IllegalStateException("Unknown source type: " + source.getClass().getName());
     }
     doStart();
     state = State.STARTED;
   }
+
   /**
    * Stops the agent. Can only be called after a successful call to start().
    * After a call to stop(), the agent can be re-configured with the
@@ -134,7 +137,7 @@ public class EmbeddedAgent {
    */
   public void stop()
       throws FlumeException {
-    if(state != State.STARTED) {
+    if (state != State.STARTED) {
       throw new IllegalStateException("Cannot be stopped unless started");
     }
     supervisor.stop();
@@ -146,9 +149,9 @@ public class EmbeddedAgent {
 
     properties = EmbeddedAgentConfiguration.configure(name, properties);
 
-    if(LOGGER.isDebugEnabled()) {
+    if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Agent configuration values");
-      for(String key : new TreeSet<String>(properties.keySet())) {
+      for (String key : new TreeSet<String>(properties.keySet())) {
         LOGGER.debug(key + " = " + properties.get(key));
       }
     }
@@ -156,17 +159,17 @@ public class EmbeddedAgent {
     MaterializedConfiguration conf = configurationProvider.get(name,
         properties);
     Map<String, SourceRunner> sources = conf.getSourceRunners();
-    if(sources.size() != 1) {
+    if (sources.size() != 1) {
       throw new FlumeException("Expected one source and got "  +
           sources.size());
     }
     Map<String, Channel> channels = conf.getChannels();
-    if(channels.size() != 1) {
+    if (channels.size() != 1) {
       throw new FlumeException("Expected one channel and got "  +
           channels.size());
     }
     Map<String, SinkRunner> sinks = conf.getSinkRunners();
-    if(sinks.size() != 1) {
+    if (sinks.size() != 1) {
       throw new FlumeException("Expected one sink group and got "  +
           sinks.size());
     }
@@ -174,6 +177,7 @@ public class EmbeddedAgent {
     this.channel = channels.values().iterator().next();
     this.sinkRunner = sinks.values().iterator().next();
   }
+
   /**
    * Adds event to the channel owned by the agent. Note however, that the
    * event is not copied and as such, the byte array and headers cannot
@@ -182,7 +186,7 @@ public class EmbeddedAgent {
    * @throws EventDeliveryException if unable to add event to channel
    */
   public void put(Event event) throws EventDeliveryException {
-    if(state != State.STARTED) {
+    if (state != State.STARTED) {
       throw new IllegalStateException("Cannot put events unless started");
     }
     try {
@@ -192,6 +196,7 @@ public class EmbeddedAgent {
           ": Unable to process event: " + ex.getMessage(), ex);
     }
   }
+
   /**
    * Adds events to the channel owned by the agent. Note however, that the
    * event is not copied and as such, the byte array and headers cannot
@@ -200,7 +205,7 @@ public class EmbeddedAgent {
    * @throws EventDeliveryException if unable to add event to channel
    */
   public void putAll(List<Event> events) throws EventDeliveryException {
-    if(state != State.STARTED) {
+    if (state != State.STARTED) {
       throw new IllegalStateException("Cannot put events unless started");
     }
     try {
@@ -226,7 +231,7 @@ public class EmbeddedAgent {
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
       error = false;
     } finally {
-      if(error) {
+      if (error) {
         stopLogError(sourceRunner);
         stopLogError(channel);
         stopLogError(sinkRunner);
@@ -234,9 +239,10 @@ public class EmbeddedAgent {
       }
     }
   }
+
   private void stopLogError(LifecycleAware lifeCycleAware) {
     try {
-      if(LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) {
+      if (LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) {
         lifeCycleAware.stop();
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
index 4a49fa0..05d2d04 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
@@ -136,8 +136,8 @@ public class EmbeddedAgentConfiguration {
    * Load balancing sink processor. See Flume User Guide for configuration
    * information.
    */
-  public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE = SinkProcessorType.LOAD_BALANCE.name();
-
+  public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE =
+      SinkProcessorType.LOAD_BALANCE.name();
 
   private static final String[] ALLOWED_SOURCES = {
     SOURCE_TYPE_EMBEDDED_ALIAS,
@@ -165,22 +165,21 @@ public class EmbeddedAgentConfiguration {
   private static void validate(String name,
       Map<String, String> properties) throws FlumeException {
 
-    if(properties.containsKey(SOURCE_TYPE)) {
+    if (properties.containsKey(SOURCE_TYPE)) {
       checkAllowed(ALLOWED_SOURCES, properties.get(SOURCE_TYPE));
     }
     checkRequired(properties, CHANNEL_TYPE);
     checkAllowed(ALLOWED_CHANNELS, properties.get(CHANNEL_TYPE));
     checkRequired(properties, SINKS);
     String sinkNames = properties.get(SINKS);
-    for(String sink : sinkNames.split("\\s+")) {
-      if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase(Locale.ENGLISH))) {
+    for (String sink : sinkNames.split("\\s+")) {
+      if (DISALLOWED_SINK_NAMES.contains(sink.toLowerCase(Locale.ENGLISH))) {
         throw new FlumeException("Sink name " + sink + " is one of the" +
             " disallowed sink names: " + DISALLOWED_SINK_NAMES);
       }
       String key = join(sink, TYPE);
       checkRequired(properties, key);
       checkAllowed(ALLOWED_SINKS, properties.get(key));
-
     }
     checkRequired(properties, SINK_PROCESSOR_TYPE);
     checkAllowed(ALLOWED_SINK_PROCESSORS, properties.get(SINK_PROCESSOR_TYPE));
@@ -201,8 +200,8 @@ public class EmbeddedAgentConfiguration {
     // we are going to modify the properties as we parse the config
     properties = new HashMap<String, String>(properties);
 
-    if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS.
-        equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
+    if (!properties.containsKey(SOURCE_TYPE) ||
+        SOURCE_TYPE_EMBEDDED_ALIAS.equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
       properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
     }
     String sinkNames = properties.remove(SINKS);
@@ -220,9 +219,6 @@ public class EmbeddedAgentConfiguration {
     // user supplied config -> agent configuration
     Map<String, String> result = Maps.newHashMap();
 
-    // properties will be modified during iteration so we need a
-    // copy of the keys
-    Set<String> userProvidedKeys;
     /*
      * First we are going to setup all the root level pointers. I.E
      * point the agent at the components, sink group at sinks, and
@@ -247,15 +243,19 @@ public class EmbeddedAgentConfiguration {
     result.put(join(name,
         BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
         BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
+
+    // Properties will be modified during iteration so we need a
+    // copy of the keys.
+    Set<String> userProvidedKeys = new HashSet<String>(properties.keySet());
+
     /*
      * Second process the sink configuration and point the sinks
      * at the channel.
      */
-    userProvidedKeys = new HashSet<String>(properties.keySet());
-    for(String sink :  sinkNames.split("\\s+")) {
-      for(String key : userProvidedKeys) {
+    for (String sink :  sinkNames.split("\\s+")) {
+      for (String key : userProvidedKeys) {
         String value = properties.get(key);
-        if(key.startsWith(sink + SEPERATOR)) {
+        if (key.startsWith(sink + SEPERATOR)) {
           properties.remove(key);
           result.put(join(name,
               BasicConfigurationConstants.CONFIG_SINKS, key), value);
@@ -271,19 +271,19 @@ public class EmbeddedAgentConfiguration {
      * correctly and then passing them on to the agent.
      */
     userProvidedKeys = new HashSet<String>(properties.keySet());
-    for(String key : userProvidedKeys) {
+    for (String key : userProvidedKeys) {
       String value = properties.get(key);
-      if(key.startsWith(SOURCE_PREFIX)) {
+      if (key.startsWith(SOURCE_PREFIX)) {
         // users use `source' but agent needs the actual source name
         key = key.replaceFirst(SOURCE, sourceName);
         result.put(join(name,
             BasicConfigurationConstants.CONFIG_SOURCES, key), value);
-      } else if(key.startsWith(CHANNEL_PREFIX)) {
+      } else if (key.startsWith(CHANNEL_PREFIX)) {
         // users use `channel' but agent needs the actual channel name
         key = key.replaceFirst(CHANNEL, channelName);
         result.put(join(name,
             BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
-      } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) {
+      } else if (key.startsWith(SINK_PROCESSOR_PREFIX)) {
         // agent.sinkgroups.sinkgroup.processor.*
         result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
                 sinkGroupName, key), value);
@@ -297,20 +297,19 @@ public class EmbeddedAgentConfiguration {
   private static void checkAllowed(String[] allowedTypes, String type) {
     boolean isAllowed = false;
     type = type.trim();
-    for(String allowedType : allowedTypes) {
-      if(allowedType.equalsIgnoreCase(type)) {
+    for (String allowedType : allowedTypes) {
+      if (allowedType.equalsIgnoreCase(type)) {
         isAllowed = true;
         break;
       }
     }
-    if(!isAllowed) {
+    if (!isAllowed) {
       throw new FlumeException("Component type of " + type + " is not in " +
           "allowed types of " + Arrays.toString(allowedTypes));
     }
   }
-  private static void checkRequired(Map<String, String> properties,
-      String name) {
-    if(!properties.containsKey(name)) {
+  private static void checkRequired(Map<String, String> properties, String name) {
+    if (!properties.containsKey(name)) {
       throw new FlumeException("Required parameter not found " + name);
     }
   }
@@ -319,7 +318,5 @@ public class EmbeddedAgentConfiguration {
     return JOINER.join(parts);
   }
 
-  private EmbeddedAgentConfiguration() {
-
-  }
+  private EmbeddedAgentConfiguration() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java
index 53389d2..71a88ec 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java
@@ -35,16 +35,16 @@ import org.apache.flume.source.AbstractSource;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class EmbeddedSource extends AbstractSource
-  implements EventDrivenSource, Configurable {
+public class EmbeddedSource extends AbstractSource implements EventDrivenSource, Configurable {
 
   @Override
   public void configure(Context context) {
-
   }
+
   public void put(Event event) throws ChannelException {
     getChannelProcessor().processEvent(event);
   }
+
   public void putAll(List<Event> events) throws ChannelException {
     getChannelProcessor().processEventBatch(events);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
index 47913dc..7140f07 100644
--- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
+++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
+import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
@@ -45,8 +47,6 @@ import org.apache.thrift.transport.TServerSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.cloudera.flume.handlers.thrift.*;
-
 public class ThriftLegacySource  extends AbstractSource implements
     EventDrivenSource, Configurable  {
 
@@ -86,7 +86,7 @@ public class ThriftLegacySource  extends AbstractSource implements
       headers.put(NANOS, Long.toString(evt.getNanos()));
       for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
         headers.put(entry.getKey().toString(),
-          UTF_8.decode(entry.getValue()).toString());
+                    UTF_8.decode(entry.getValue()).toString());
       }
       headers.put(OG_EVENT, "yes");
 
@@ -139,8 +139,8 @@ public class ThriftLegacySource  extends AbstractSource implements
       serverTransport = new TServerSocket(bindAddr);
       ThriftFlumeEventServer.Processor processor =
           new ThriftFlumeEventServer.Processor(new ThriftFlumeEventServerImpl());
-      server = new TThreadPoolServer(new TThreadPoolServer.
-          Args(serverTransport).processor(processor));
+      server = new TThreadPoolServer(
+          new TThreadPoolServer.Args(serverTransport).processor(processor));
     } catch (TTransportException e) {
       throw new FlumeException("Failed starting source", e);
     }


Mime
View raw message