flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [04/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes
Date Thu, 30 Jun 2016 02:21:30 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index 40abba2..130bc64 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -17,8 +17,15 @@
  */
 package org.apache.flume.node;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelFactory;
@@ -58,18 +65,15 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-public abstract class AbstractConfigurationProvider implements
-    ConfigurationProvider {
+public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
 
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(AbstractConfigurationProvider.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class);
 
   private final String agentName;
   private final SourceFactory sourceFactory;
   private final SinkFactory sinkFactory;
   private final ChannelFactory channelFactory;
 
-
   private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;
 
   public AbstractConfigurationProvider(String agentName) {
@@ -96,18 +100,16 @@ public abstract class AbstractConfigurationProvider implements
         loadChannels(agentConf, channelComponentMap);
         loadSources(agentConf, channelComponentMap, sourceRunnerMap);
         loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
-        Set<String> channelNames =
-            new HashSet<String>(channelComponentMap.keySet());
-        for(String channelName : channelNames) {
-          ChannelComponent channelComponent = channelComponentMap.
-              get(channelName);
-          if(channelComponent.components.isEmpty()) {
+        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
+        for (String channelName : channelNames) {
+          ChannelComponent channelComponent = channelComponentMap.get(channelName);
+          if (channelComponent.components.isEmpty()) {
             LOGGER.warn(String.format("Channel %s has no components connected" +
                 " and has been removed.", channelName));
             channelComponentMap.remove(channelName);
-            Map<String, Channel> nameChannelMap = channelCache.
-                get(channelComponent.channel.getClass());
-            if(nameChannelMap != null) {
+            Map<String, Channel> nameChannelMap =
+                channelCache.get(channelComponent.channel.getClass());
+            if (nameChannelMap != null) {
               nameChannelMap.remove(channelName);
             }
           } else {
@@ -116,10 +118,10 @@ public abstract class AbstractConfigurationProvider implements
             conf.addChannel(channelName, channelComponent.channel);
           }
         }
-        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
+        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
           conf.addSourceRunner(entry.getKey(), entry.getValue());
         }
-        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
+        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
           conf.addSinkRunner(entry.getKey(), entry.getValue());
         }
       } catch (InstantiationException ex) {
@@ -155,21 +157,21 @@ public abstract class AbstractConfigurationProvider implements
     ListMultimap<Class<? extends Channel>, String> channelsNotReused =
         ArrayListMultimap.create();
     // assume all channels will not be re-used
-    for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) {
+    for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
+         channelCache.entrySet()) {
       Class<? extends Channel> channelKlass = entry.getKey();
       Set<String> channelNames = entry.getValue().keySet();
       channelsNotReused.get(channelKlass).addAll(channelNames);
     }
 
     Set<String> channelNames = agentConf.getChannelSet();
-    Map<String, ComponentConfiguration> compMap =
-        agentConf.getChannelConfigMap();
+    Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
     /*
      * Components which have a ComponentConfiguration object
      */
     for (String chName : channelNames) {
       ComponentConfiguration comp = compMap.get(chName);
-      if(comp != null) {
+      if (comp != null) {
         Channel channel = getOrCreateChannel(channelsNotReused,
             comp.getComponentName(), comp.getType());
         try {
@@ -190,17 +192,16 @@ public abstract class AbstractConfigurationProvider implements
      */
     for (String chName : channelNames) {
       Context context = agentConf.getChannelContext().get(chName);
-      if(context != null){
-        Channel channel =
-            getOrCreateChannel(channelsNotReused, chName, context.getString(
-                BasicConfigurationConstants.CONFIG_TYPE));
+      if (context != null) {
+        Channel channel = getOrCreateChannel(channelsNotReused, chName,
+            context.getString(BasicConfigurationConstants.CONFIG_TYPE));
         try {
           Configurables.configure(channel, context);
           channelComponentMap.put(chName, new ChannelComponent(channel));
           LOGGER.info("Created channel " + chName);
         } catch (Exception e) {
           String msg = String.format("Channel %s has been removed due to an " +
-                "error during configuration", chName);
+              "error during configuration", chName);
           LOGGER.error(msg, e);
         }
       }
@@ -212,7 +213,7 @@ public abstract class AbstractConfigurationProvider implements
       Map<String, Channel> channelMap = channelCache.get(channelKlass);
       if (channelMap != null) {
         for (String channelName : channelsNotReused.get(channelKlass)) {
-          if(channelMap.remove(channelName) != null) {
+          if (channelMap.remove(channelName) != null) {
             LOGGER.info("Removed {} of type {}", channelName, channelKlass);
           }
         }
@@ -228,12 +229,11 @@ public abstract class AbstractConfigurationProvider implements
       String name, String type)
       throws FlumeException {
 
-    Class<? extends Channel> channelClass = channelFactory.
-        getClass(type);
+    Class<? extends Channel> channelClass = channelFactory.getClass(type);
     /*
      * Channel has requested a new instance on each re-configuration
      */
-    if(channelClass.isAnnotationPresent(Disposable.class)) {
+    if (channelClass.isAnnotationPresent(Disposable.class)) {
       Channel channel = channelFactory.create(name, type);
       channel.setName(name);
       return channel;
@@ -244,7 +244,7 @@ public abstract class AbstractConfigurationProvider implements
       channelCache.put(channelClass, channelMap);
     }
     Channel channel = channelMap.get(name);
-    if(channel == null) {
+    if (channel == null) {
       channel = channelFactory.create(name, type);
       channel.setName(name);
       channelMap.put(name, channel);
@@ -266,7 +266,7 @@ public abstract class AbstractConfigurationProvider implements
      */
     for (String sourceName : sourceNames) {
       ComponentConfiguration comp = compMap.get(sourceName);
-      if(comp != null) {
+      if (comp != null) {
         SourceConfiguration config = (SourceConfiguration) comp;
 
         Source source = sourceFactory.create(comp.getComponentName(),
@@ -277,11 +277,11 @@ public abstract class AbstractConfigurationProvider implements
           List<Channel> sourceChannels = new ArrayList<Channel>();
           for (String chName : channelNames) {
             ChannelComponent channelComponent = channelComponentMap.get(chName);
-            if(channelComponent != null) {
+            if (channelComponent != null) {
               sourceChannels.add(channelComponent.channel);
             }
           }
-          if(sourceChannels.isEmpty()) {
+          if (sourceChannels.isEmpty()) {
             String msg = String.format("Source %s is not connected to a " +
                 "channel",  sourceName);
             throw new IllegalStateException(msg);
@@ -298,10 +298,10 @@ public abstract class AbstractConfigurationProvider implements
           source.setChannelProcessor(channelProcessor);
           sourceRunnerMap.put(comp.getComponentName(),
               SourceRunner.forSource(source));
-          for(Channel channel : sourceChannels) {
-            ChannelComponent channelComponent = Preconditions.
-                checkNotNull(channelComponentMap.get(channel.getName()),
-                    String.format("Channel %s", channel.getName()));
+          for (Channel channel : sourceChannels) {
+            ChannelComponent channelComponent =
+                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+                                           String.format("Channel %s", channel.getName()));
             channelComponent.components.add(sourceName);
           }
         } catch (Exception e) {
@@ -318,10 +318,10 @@ public abstract class AbstractConfigurationProvider implements
     Map<String, Context> sourceContexts = agentConf.getSourceContext();
     for (String sourceName : sourceNames) {
       Context context = sourceContexts.get(sourceName);
-      if(context != null){
+      if (context != null) {
         Source source =
             sourceFactory.create(sourceName,
-                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
+                                 context.getString(BasicConfigurationConstants.CONFIG_TYPE));
         try {
           Configurables.configure(source, context);
           List<Channel> sourceChannels = new ArrayList<Channel>();
@@ -329,11 +329,11 @@ public abstract class AbstractConfigurationProvider implements
               BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
           for (String chName : channelNames) {
             ChannelComponent channelComponent = channelComponentMap.get(chName);
-            if(channelComponent != null) {
+            if (channelComponent != null) {
               sourceChannels.add(channelComponent.channel);
             }
           }
-          if(sourceChannels.isEmpty()) {
+          if (sourceChannels.isEmpty()) {
             String msg = String.format("Source %s is not connected to a " +
                 "channel",  sourceName);
             throw new IllegalStateException(msg);
@@ -349,10 +349,10 @@ public abstract class AbstractConfigurationProvider implements
           source.setChannelProcessor(channelProcessor);
           sourceRunnerMap.put(sourceName,
               SourceRunner.forSource(source));
-          for(Channel channel : sourceChannels) {
-            ChannelComponent channelComponent = Preconditions.
-                checkNotNull(channelComponentMap.get(channel.getName()),
-                    String.format("Channel %s", channel.getName()));
+          for (Channel channel : sourceChannels) {
+            ChannelComponent channelComponent =
+                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
+                                           String.format("Channel %s", channel.getName()));
             channelComponent.components.add(sourceName);
           }
         } catch (Exception e) {
@@ -376,15 +376,13 @@ public abstract class AbstractConfigurationProvider implements
      */
     for (String sinkName : sinkNames) {
       ComponentConfiguration comp = compMap.get(sinkName);
-      if(comp != null) {
+      if (comp != null) {
         SinkConfiguration config = (SinkConfiguration) comp;
-        Sink sink = sinkFactory.create(comp.getComponentName(),
-            comp.getType());
+        Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
         try {
           Configurables.configure(sink, config);
-          ChannelComponent channelComponent = channelComponentMap.
-              get(config.getChannel());
-          if(channelComponent == null) {
+          ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
+          if (channelComponent == null) {
             String msg = String.format("Sink %s is not connected to a " +
                 "channel",  sinkName);
             throw new IllegalStateException(msg);
@@ -406,14 +404,15 @@ public abstract class AbstractConfigurationProvider implements
     Map<String, Context> sinkContexts = agentConf.getSinkContext();
     for (String sinkName : sinkNames) {
       Context context = sinkContexts.get(sinkName);
-      if(context != null) {
+      if (context != null) {
         Sink sink = sinkFactory.create(sinkName, context.getString(
             BasicConfigurationConstants.CONFIG_TYPE));
         try {
           Configurables.configure(sink, context);
-          ChannelComponent channelComponent = channelComponentMap.
-              get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
-          if(channelComponent == null) {
+          ChannelComponent channelComponent =
+              channelComponentMap.get(
+                  context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
+          if (channelComponent == null) {
             String msg = String.format("Sink %s is not connected to a " +
                 "channel",  sinkName);
             throw new IllegalStateException(msg);
@@ -441,7 +440,7 @@ public abstract class AbstractConfigurationProvider implements
     Map<String, String> usedSinks = new HashMap<String, String>();
     for (String groupName: sinkGroupNames) {
       ComponentConfiguration comp = compMap.get(groupName);
-      if(comp != null) {
+      if (comp != null) {
         SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
         List<Sink> groupSinks = new ArrayList<Sink>();
         for (String sink : groupConf.getSinks()) {
@@ -475,7 +474,7 @@ public abstract class AbstractConfigurationProvider implements
       }
     }
     // add any unassigned sinks to solo collectors
-    for(Entry<String, Sink> entry : sinks.entrySet()) {
+    for (Entry<String, Sink> entry : sinks.entrySet()) {
       if (!usedSinks.containsValue(entry.getKey())) {
         try {
           SinkProcessor pr = new DefaultSinkProcessor();
@@ -483,9 +482,8 @@ public abstract class AbstractConfigurationProvider implements
           sinkMap.add(entry.getValue());
           pr.setSinks(sinkMap);
           Configurables.configure(pr, new Context());
-          sinkRunnerMap.put(entry.getKey(),
-              new SinkRunner(pr));
-        } catch(Exception e) {
+          sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
+        } catch (Exception e) {
           String msg = String.format("SinkGroup %s has been removed due to " +
               "an error during configuration", entry.getKey());
           LOGGER.error(msg, e);
@@ -496,6 +494,7 @@ public abstract class AbstractConfigurationProvider implements
   private static class ChannelComponent {
     final Channel channel;
     final List<String> components;
+
     ChannelComponent(Channel channel) {
       this.channel = channel;
       components = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 959fa77..d6d92f0 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -19,15 +19,10 @@
 
 package org.apache.flume.node;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -49,10 +44,14 @@ import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 
 public class Application {
 
@@ -77,7 +76,7 @@ public class Application {
   }
 
   public synchronized void start() {
-    for(LifecycleAware component : components) {
+    for (LifecycleAware component : components) {
       supervisor.supervise(component,
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
     }
@@ -91,7 +90,7 @@ public class Application {
 
   public synchronized void stop() {
     supervisor.stop();
-    if(monitorServer != null) {
+    if (monitorServer != null) {
       monitorServer.stop();
     }
   }
@@ -99,37 +98,37 @@ public class Application {
   private void stopAllComponents() {
     if (this.materializedConfiguration != null) {
       logger.info("Shutting down configuration: {}", this.materializedConfiguration);
-      for (Entry<String, SourceRunner> entry : this.materializedConfiguration
-          .getSourceRunners().entrySet()) {
-        try{
+      for (Entry<String, SourceRunner> entry :
+           this.materializedConfiguration.getSourceRunners().entrySet()) {
+        try {
           logger.info("Stopping Source " + entry.getKey());
           supervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
+        } catch (Exception e) {
           logger.error("Error while stopping {}", entry.getValue(), e);
         }
       }
 
       for (Entry<String, SinkRunner> entry :
-        this.materializedConfiguration.getSinkRunners().entrySet()) {
-        try{
+           this.materializedConfiguration.getSinkRunners().entrySet()) {
+        try {
           logger.info("Stopping Sink " + entry.getKey());
           supervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
+        } catch (Exception e) {
           logger.error("Error while stopping {}", entry.getValue(), e);
         }
       }
 
       for (Entry<String, Channel> entry :
-        this.materializedConfiguration.getChannels().entrySet()) {
-        try{
+           this.materializedConfiguration.getChannels().entrySet()) {
+        try {
           logger.info("Stopping Channel " + entry.getKey());
           supervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
+        } catch (Exception e) {
           logger.error("Error while stopping {}", entry.getValue(), e);
         }
       }
     }
-    if(monitorServer != null) {
+    if (monitorServer != null) {
       monitorServer.stop();
     }
   }
@@ -140,12 +139,12 @@ public class Application {
     this.materializedConfiguration = materializedConfiguration;
 
     for (Entry<String, Channel> entry :
-      materializedConfiguration.getChannels().entrySet()) {
-      try{
+        materializedConfiguration.getChannels().entrySet()) {
+      try {
         logger.info("Starting Channel " + entry.getKey());
         supervisor.supervise(entry.getValue(),
             new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e){
+      } catch (Exception e) {
         logger.error("Error while starting {}", entry.getValue(), e);
       }
     }
@@ -153,9 +152,9 @@ public class Application {
     /*
      * Wait for all channels to start.
      */
-    for(Channel ch: materializedConfiguration.getChannels().values()){
-      while(ch.getLifecycleState() != LifecycleState.START
-          && !supervisor.isComponentInErrorState(ch)){
+    for (Channel ch : materializedConfiguration.getChannels().values()) {
+      while (ch.getLifecycleState() != LifecycleState.START
+          && !supervisor.isComponentInErrorState(ch)) {
         try {
           logger.info("Waiting for channel: " + ch.getName() +
               " to start. Sleeping for 500 ms");
@@ -167,23 +166,22 @@ public class Application {
       }
     }
 
-    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
-        .entrySet()) {
-      try{
+    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
+      try {
         logger.info("Starting Sink " + entry.getKey());
         supervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
       } catch (Exception e) {
         logger.error("Error while starting {}", entry.getValue(), e);
       }
     }
 
-    for (Entry<String, SourceRunner> entry : materializedConfiguration
-        .getSourceRunners().entrySet()) {
-      try{
+    for (Entry<String, SourceRunner> entry :
+         materializedConfiguration.getSourceRunners().entrySet()) {
+      try {
         logger.info("Starting Source " + entry.getKey());
         supervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
       } catch (Exception e) {
         logger.error("Error while starting {}", entry.getValue(), e);
       }
@@ -203,7 +201,7 @@ public class Application {
         try {
           //Is it a known type?
           klass = MonitoringType.valueOf(
-                  monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+              monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
         } catch (Exception e) {
           //Not a known type, use FQCN
           klass = (Class<? extends MonitorService>) Class.forName(monitorType);
@@ -213,7 +211,7 @@ public class Application {
         for (String key : keys) {
           if (key.startsWith(CONF_MONITOR_PREFIX)) {
             context.put(key.substring(CONF_MONITOR_PREFIX.length()),
-                    systemProps.getProperty(key));
+                systemProps.getProperty(key));
           }
         }
         monitorServer.configure(context);
@@ -221,7 +219,7 @@ public class Application {
       }
     } catch (Exception e) {
       logger.warn("Error starting monitoring. "
-              + "Monitoring might not be available.", e);
+          + "Monitoring might not be available.", e);
     }
 
   }
@@ -285,18 +283,17 @@ public class Application {
           EventBus eventBus = new EventBus(agentName + "-event-bus");
           List<LifecycleAware> components = Lists.newArrayList();
           PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
-            new PollingZooKeeperConfigurationProvider(
-              agentName, zkConnectionStr, baseZkPath, eventBus);
+              new PollingZooKeeperConfigurationProvider(
+                  agentName, zkConnectionStr, baseZkPath, eventBus);
           components.add(zookeeperConfigurationProvider);
           application = new Application(components);
           eventBus.register(application);
         } else {
           StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
-            new StaticZooKeeperConfigurationProvider(
-              agentName, zkConnectionStr, baseZkPath);
+              new StaticZooKeeperConfigurationProvider(
+                  agentName, zkConnectionStr, baseZkPath);
           application = new Application();
-          application.handleConfigurationEvent(zookeeperConfigurationProvider
-            .getConfiguration());
+          application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
         }
       } else {
         File configurationFile = new File(commandLine.getOptionValue('f'));
@@ -308,16 +305,16 @@ public class Application {
         if (!configurationFile.exists()) {
           // If command line invocation, then need to fail fast
           if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
-            null) {
+              null) {
             String path = configurationFile.getPath();
             try {
               path = configurationFile.getCanonicalPath();
             } catch (IOException ex) {
               logger.error("Failed to read canonical path for file: " + path,
-                ex);
+                  ex);
             }
             throw new ParseException(
-              "The specified configuration file does not exist: " + path);
+                "The specified configuration file does not exist: " + path);
           }
         }
         List<LifecycleAware> components = Lists.newArrayList();
@@ -325,18 +322,16 @@ public class Application {
         if (reload) {
           EventBus eventBus = new EventBus(agentName + "-event-bus");
           PollingPropertiesFileConfigurationProvider configurationProvider =
-            new PollingPropertiesFileConfigurationProvider(
-              agentName, configurationFile, eventBus, 30);
+              new PollingPropertiesFileConfigurationProvider(
+                  agentName, configurationFile, eventBus, 30);
           components.add(configurationProvider);
           application = new Application(components);
           eventBus.register(application);
         } else {
           PropertiesFileConfigurationProvider configurationProvider =
-            new PropertiesFileConfigurationProvider(
-              agentName, configurationFile);
+              new PropertiesFileConfigurationProvider(agentName, configurationFile);
           application = new Application();
-          application.handleConfigurationEvent(configurationProvider
-            .getConfiguration());
+          application.handleConfigurationEvent(configurationProvider.getConfiguration());
         }
       }
       application.start();
@@ -350,8 +345,7 @@ public class Application {
       });
 
     } catch (Exception e) {
-      logger.error("A fatal error occurred while running. Exception follows.",
-          e);
+      logger.error("A fatal error occurred while running. Exception follows.", e);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
index 6a27898..9528cb7 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
@@ -19,11 +19,6 @@
 
 package org.apache.flume.node;
 
-
 public interface ConfigurationProvider {
-
-
-  public MaterializedConfiguration getConfiguration();
-
-
+  MaterializedConfiguration getConfiguration();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
index 857c8a5..91a09f0 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
@@ -32,11 +32,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public class PollingPropertiesFileConfigurationProvider extends
- PropertiesFileConfigurationProvider implements LifecycleAware {
+public class PollingPropertiesFileConfigurationProvider
+    extends PropertiesFileConfigurationProvider
+    implements LifecycleAware {
 
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(PollingPropertiesFileConfigurationProvider.class);
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
 
   private final EventBus eventBus;
   private final File file;
@@ -83,8 +84,8 @@ public class PollingPropertiesFileConfigurationProvider extends
     LOGGER.info("Configuration provider stopping");
 
     executorService.shutdown();
-    try{
-      while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
+    try {
+      while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
         LOGGER.debug("Waiting for file watcher to terminate");
       }
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
index 536dcc4..a652390 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java
@@ -56,7 +56,7 @@ public class SimpleMaterializedConfiguration implements MaterializedConfiguratio
   }
 
   @Override
-  public void addChannel(String name, Channel channel){
+  public void addChannel(String name, Channel channel) {
     channels.put(name, channel);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
index 5cc292a..f20462b 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java
@@ -35,7 +35,7 @@ public abstract class AbstractRpcClient implements RpcClient {
       RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
 
   @Override
-  public int getBatchSize(){
+  public int getBatchSize() {
     return batchSize;
   }
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
index db6905a..9d82acb 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
@@ -70,7 +70,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient {
   //since shared data structures are created here.
   private synchronized void configureHosts(Properties properties)
       throws FlumeException {
-    if(isActive){
+    if (isActive) {
       logger.error("This client was already configured, " +
           "cannot reconfigure.");
       throw new FlumeException("This client was already configured, " +
@@ -79,7 +79,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient {
     hosts = HostInfo.getHostInfoList(properties);
     String tries = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS);
-    if (tries == null || tries.isEmpty()){
+    if (tries == null || tries.isEmpty()) {
       maxTries = hosts.size();
     } else {
       try {
@@ -269,7 +269,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient {
         continue;
       }
     }
-    for(int count = 0; count <= lastCheckedhost; count++) {
+    for (int count = 0; count <= lastCheckedhost; count++) {
       HostInfo hostInfo = hosts.get(count);
       try {
         setDefaultProperties(hostInfo, props);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
index 8a81208..53d99a2 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
@@ -76,7 +76,7 @@ public class HostInfo {
         // Ignore that host if value is not there
         if (hostAndPortStr != null) {
           String[] hostAndPort = hostAndPortStr.split(":");
-          if (hostAndPort.length != 2){
+          if (hostAndPort.length != 2) {
             LOGGER.error("Invalid host address" + hostAndPortStr);
             throw new FlumeException("Invalid host address" + hostAndPortStr);
           }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
index e5fcc36..d3ccf74 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
@@ -161,7 +161,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
         RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);
 
     long maxBackoff = 0;
-    if(maxBackoffStr != null) {
+    if (maxBackoffStr != null) {
       maxBackoff = Long.parseLong(maxBackoffStr);
     }
 
@@ -240,12 +240,13 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
 
     private OrderSelector<HostInfo> selector;
 
-    RoundRobinHostSelector(boolean backoff, long maxBackoff){
+    RoundRobinHostSelector(boolean backoff, long maxBackoff) {
       selector = new RoundRobinOrderSelector<HostInfo>(backoff);
-      if(maxBackoff != 0){
+      if (maxBackoff != 0) {
         selector.setMaxTimeOut(maxBackoff);
       }
     }
+
     @Override
     public synchronized Iterator<HostInfo> createHostIterator() {
       return selector.createIterator();
@@ -256,7 +257,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient {
       selector.setObjects(hosts);
     }
 
-    public synchronized void informFailure(HostInfo failedHost){
+    public synchronized void informFailure(HostInfo failedHost) {
       selector.informFailure(failedHost);
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 3661672..21a9553 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -79,8 +79,7 @@ import org.slf4j.LoggerFactory;
  * The connections are intended to be opened before clients are given access so
  * that the object cannot ever be in an inconsistent when exposed to users.
  */
-public class NettyAvroRpcClient extends AbstractRpcClient
-implements RpcClient {
+public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
 
   private ExecutorService callTimeoutPool;
   private final ReentrantLock stateLock = new ReentrantLock();
@@ -135,11 +134,11 @@ implements RpcClient {
     try {
 
       ExecutorService bossExecutor =
-        Executors.newCachedThreadPool(new TransceiverThreadFactory(
-          "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
+          Executors.newCachedThreadPool(new TransceiverThreadFactory(
+              "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
       ExecutorService workerExecutor =
-        Executors.newCachedThreadPool(new TransceiverThreadFactory(
-          "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
+          Executors.newCachedThreadPool(new TransceiverThreadFactory(
+              "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
 
       if (enableDeflateCompression || enableSsl) {
         if (maxIoWorkers >= 1) {
@@ -468,7 +467,7 @@ implements RpcClient {
   }
 
 
-    /**
+  /**
    * <p>
    * Configure the actual client using the properties.
    * <tt>properties</tt> should have at least 2 params:
@@ -479,13 +478,13 @@ implements RpcClient {
    * <tt>batch-size</tt> = <i>batchSize</i>
    * @param properties The properties to instantiate the client with.
    * @return
-     */
+   */
   @Override
   public synchronized void configure(Properties properties)
       throws FlumeException {
     stateLock.lock();
-    try{
-      if(connState == ConnState.READY || connState == ConnState.DEAD){
+    try {
+      if (connState == ConnState.READY || connState == ConnState.DEAD) {
         throw new FlumeException("This client was already configured, " +
             "cannot reconfigure.");
       }
@@ -529,12 +528,12 @@ implements RpcClient {
     }
 
     String host = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX+hosts[0]);
+        RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + hosts[0]);
     if (host == null || host.isEmpty()) {
       throw new FlumeException("Host not found: " + hosts[0]);
     }
     String[] hostAndPort = host.split(":");
-    if (hostAndPort.length != 2){
+    if (hostAndPort.length != 2) {
       throw new FlumeException("Invalid hostname: " + hosts[0]);
     }
     Integer port = null;
@@ -583,10 +582,12 @@ implements RpcClient {
       }
     }
 
-    String enableCompressionStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE);
+    String enableCompressionStr =
+        properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE);
     if (enableCompressionStr != null && enableCompressionStr.equalsIgnoreCase("deflate")) {
       this.enableDeflateCompression = true;
-      String compressionLvlStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL);
+      String compressionLvlStr =
+          properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL);
       compressionLevel = RpcClientConfigurationConstants.DEFAULT_COMPRESSION_LEVEL;
       if (compressionLvlStr != null) {
         try {
@@ -608,7 +609,7 @@ implements RpcClient {
     truststoreType = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
     String excludeProtocolsStr = properties.getProperty(
-      RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
+        RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
     if (excludeProtocolsStr == null) {
       excludeProtocols.add("SSLv3");
     } else {
@@ -618,14 +619,13 @@ implements RpcClient {
       }
     }
 
-    String maxIoWorkersStr = properties.getProperty(
-      RpcClientConfigurationConstants.MAX_IO_WORKERS);
+    String maxIoWorkersStr = properties.getProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS);
     if (!StringUtils.isEmpty(maxIoWorkersStr)) {
       try {
         maxIoWorkers = Integer.parseInt(maxIoWorkersStr);
       } catch (NumberFormatException ex) {
-        logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
-          "default maxIOWorkers.");
+        logger.warn("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
+            "default maxIOWorkers.");
         maxIoWorkers = -1;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 343e07b..d83cf19 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -66,18 +66,18 @@ public final class RpcClientConfigurationConstants {
   /**
    * Default batch size.
    */
-  public final static Integer DEFAULT_BATCH_SIZE = 100;
+  public static final Integer DEFAULT_BATCH_SIZE = 100;
 
   /**
    * Default connection, handshake, and initial request timeout in milliseconds.
    */
-  public final static long DEFAULT_CONNECT_TIMEOUT_MILLIS =
+  public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS =
       TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS);
 
   /**
    * Default request timeout in milliseconds.
    */
-  public final static long DEFAULT_REQUEST_TIMEOUT_MILLIS =
+  public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS =
       TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS);
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
index 11bc94c..5cb3332 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.api;
 
+import org.apache.flume.FlumeException;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
@@ -25,7 +27,6 @@ import java.io.IOException;
 import java.io.Reader;
 import java.util.Locale;
 import java.util.Properties;
-import org.apache.flume.FlumeException;
 
 /**
  * Factory class to construct Flume {@link RPCClient} implementations.
@@ -63,12 +64,12 @@ public class RpcClientFactory {
     try {
       String clientClassType = type;
       ClientType clientType = null;
-      try{
+      try {
         clientType = ClientType.valueOf(type.toUpperCase(Locale.ENGLISH));
-      } catch (IllegalArgumentException e){
+      } catch (IllegalArgumentException e) {
         clientType = ClientType.OTHER;
       }
-      if (!clientType.equals(ClientType.OTHER)){
+      if (!clientType.equals(ClientType.OTHER)) {
         clientClassType = clientType.getClientClassName();
       }
       clazz =
@@ -181,8 +182,7 @@ public class RpcClientFactory {
    * @return an {@linkplain RpcClient} which uses thrift configured with the
    * given parameters.
    */
-  public static RpcClient getThriftInstance(String hostname, Integer port,
-    Integer batchSize) {
+  public static RpcClient getThriftInstance(String hostname, Integer port, Integer batchSize) {
     if (hostname == null) {
       throw new NullPointerException("hostname must not be null");
     }
@@ -196,7 +196,7 @@ public class RpcClientFactory {
     Properties props = new Properties();
     props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
     props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
-      hostname + ":" + port.intValue());
+        hostname + ":" + port.intValue());
     props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
     ThriftRpcClient client = new ThriftRpcClient();
     client.configure(props);
@@ -227,7 +227,7 @@ public class RpcClientFactory {
    */
   public static RpcClient getThriftInstance(Properties props) {
     props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
-      ClientType.THRIFT.clientClassName);
+                      ClientType.THRIFT.clientClassName);
     return getInstance(props);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index 857948f..1d21d5f 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -32,16 +32,15 @@ import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
-
+import javax.net.ssl.TrustManagerFactory;
 import java.io.FileInputStream;
 import java.nio.ByteBuffer;
 import java.security.KeyStore;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -50,7 +49,6 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
-import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -65,8 +63,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class ThriftRpcClient extends AbstractRpcClient {
-  private static final Logger LOGGER =
-    LoggerFactory.getLogger(ThriftRpcClient.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class);
 
   /**
    * Config param for the thrift protocol to use.
@@ -104,8 +101,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
       @Override
       public Thread newThread(Runnable r) {
         Thread t = new Thread(r);
-        t.setName("Flume Thrift RPC thread - " + String.valueOf(
-          threadCounter.incrementAndGet()));
+        t.setName("Flume Thrift RPC thread - " + String.valueOf(threadCounter.incrementAndGet()));
         return t;
       }
     });
@@ -126,11 +122,11 @@ public class ThriftRpcClient extends AbstractRpcClient {
     try {
       if (!isActive()) {
         throw new EventDeliveryException("Client was closed due to error. " +
-          "Please create a new client");
+            "Please create a new client");
       }
       client = connectionManager.checkout();
       final ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(event
-        .getHeaders(), ByteBuffer.wrap(event.getBody()));
+          .getHeaders(), ByteBuffer.wrap(event.getBody()));
       doAppend(client, thriftEvent).get(requestTimeout, TimeUnit.MILLISECONDS);
     } catch (Throwable e) {
       if (e instanceof ExecutionException) {
@@ -169,22 +165,22 @@ public class ThriftRpcClient extends AbstractRpcClient {
     try {
       if (!isActive()) {
         throw new EventDeliveryException("Client was closed " +
-          "due to error or is not yet configured.");
+            "due to error or is not yet configured.");
       }
       client = connectionManager.checkout();
       final List<ThriftFlumeEvent> thriftFlumeEvents = new ArrayList
-        <ThriftFlumeEvent>();
+          <ThriftFlumeEvent>();
       Iterator<Event> eventsIter = events.iterator();
       while (eventsIter.hasNext()) {
         thriftFlumeEvents.clear();
         for (int i = 0; i < batchSize && eventsIter.hasNext(); i++) {
           Event event = eventsIter.next();
           thriftFlumeEvents.add(new ThriftFlumeEvent(event.getHeaders(),
-            ByteBuffer.wrap(event.getBody())));
+              ByteBuffer.wrap(event.getBody())));
         }
         if (!thriftFlumeEvents.isEmpty()) {
           doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,
-            TimeUnit.MILLISECONDS);
+              TimeUnit.MILLISECONDS);
         }
       }
     } catch (Throwable e) {
@@ -216,7 +212,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
   }
 
   private Future<Void> doAppend(final ClientWrapper client,
-    final ThriftFlumeEvent e) throws Exception {
+                                final ThriftFlumeEvent e) throws Exception {
 
     return callTimeoutPool.submit(new Callable<Void>() {
       @Override
@@ -224,7 +220,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
         Status status = client.client.append(e);
         if (status != Status.OK) {
           throw new EventDeliveryException("Failed to deliver events. Server " +
-            "returned status : " + status.name());
+              "returned status : " + status.name());
         }
         return null;
       }
@@ -232,7 +228,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
   }
 
   private Future<Void> doAppendBatch(final ClientWrapper client,
-    final List<ThriftFlumeEvent> e) throws Exception {
+                                     final List<ThriftFlumeEvent> e) throws Exception {
 
     return callTimeoutPool.submit(new Callable<Void>() {
       @Override
@@ -240,7 +236,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
         Status status = client.client.appendBatch(e);
         if (status != Status.OK) {
           throw new EventDeliveryException("Failed to deliver events. Server " +
-            "returned status : " + status.name());
+              "returned status : " + status.name());
         }
         return null;
       }
@@ -265,11 +261,11 @@ public class ThriftRpcClient extends AbstractRpcClient {
       connState = State.DEAD;
       connectionManager.closeAll();
       callTimeoutPool.shutdown();
-      if(!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) {
+      if (!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) {
         callTimeoutPool.shutdownNow();
       }
     } catch (Throwable ex) {
-      if(ex instanceof Error) {
+      if (ex instanceof Error) {
         throw (Error) ex;
       } else if (ex instanceof RuntimeException) {
         throw (RuntimeException) ex;
@@ -284,7 +280,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
   protected void configure(Properties properties) throws FlumeException {
     if (isActive()) {
       throw new FlumeException("Attempting to re-configured an already " +
-        "configured client!");
+          "configured client!");
     }
     stateLock.lock();
     try {
@@ -304,40 +300,40 @@ public class ThriftRpcClient extends AbstractRpcClient {
         protocol = COMPACT_PROTOCOL;
       }
       batchSize = Integer.parseInt(properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
-        RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
+          RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
+          RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
       requestTimeout = Long.parseLong(properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
-        String.valueOf(
-          RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
+          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
+          String.valueOf(
+              RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
       if (requestTimeout < 1000) {
         LOGGER.warn("Request timeout specified less than 1s. " +
-          "Using default value instead.");
+            "Using default value instead.");
         requestTimeout =
-          RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
+            RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
       }
       int connectionPoolSize = Integer.parseInt(properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
-        String.valueOf(RpcClientConfigurationConstants
-          .DEFAULT_CONNECTION_POOL_SIZE)));
-      if(connectionPoolSize < 1) {
+          RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE,
+          String.valueOf(RpcClientConfigurationConstants
+              .DEFAULT_CONNECTION_POOL_SIZE)));
+      if (connectionPoolSize < 1) {
         LOGGER.warn("Connection Pool Size specified is less than 1. " +
-          "Using default value instead.");
+            "Using default value instead.");
         connectionPoolSize = RpcClientConfigurationConstants
-          .DEFAULT_CONNECTION_POOL_SIZE;
+            .DEFAULT_CONNECTION_POOL_SIZE;
       }
 
       enableSsl = Boolean.parseBoolean(properties.getProperty(
-              RpcClientConfigurationConstants.CONFIG_SSL));
-      if(enableSsl) {
+          RpcClientConfigurationConstants.CONFIG_SSL));
+      if (enableSsl) {
         truststore = properties.getProperty(
-                RpcClientConfigurationConstants.CONFIG_TRUSTSTORE);
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE);
         truststorePassword = properties.getProperty(
-                RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
         truststoreType = properties.getProperty(
-                RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
         String excludeProtocolsStr = properties.getProperty(
-                RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
+            RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
         if (excludeProtocolsStr == null) {
           excludeProtocols.add("SSLv3");
         } else {
@@ -353,7 +349,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
     } catch (Throwable ex) {
       //Failed to configure, kill the client.
       connState = State.DEAD;
-      if(ex instanceof Error) {
+      if (ex instanceof Error) {
         throw (Error) ex;
       } else if (ex instanceof RuntimeException) {
         throw (RuntimeException) ex;
@@ -381,40 +377,37 @@ public class ThriftRpcClient extends AbstractRpcClient {
     public final TTransport transport;
     private final int hashCode;
 
-    public ClientWrapper() throws Exception{
+    public ClientWrapper() throws Exception {
       TSocket tsocket;
-      if(enableSsl) {
+      if (enableSsl) {
         // JDK6's factory doesn't appear to pass the protocol onto the Socket
         // properly so we have to do some magic to make sure that happens.
         // Not an issue in JDK7 Lifted from thrift-0.9.1 to make the SSLContext
         SSLContext sslContext = createSSLContext(truststore, truststorePassword,
-                truststoreType);
+            truststoreType);
 
         // Create the factory from it
         SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
 
         // Create the TSocket from that
         tsocket = createSSLSocket(
-                sslSockFactory, hostname, port, 120000, excludeProtocols);
+            sslSockFactory, hostname, port, 120000, excludeProtocols);
       } else {
         tsocket = new TSocket(hostname, port);
       }
 
-
-     transport = getTransport(tsocket);
+      transport = getTransport(tsocket);
 
       // The transport is already open for SSL as part of TSSLTransportFactory.getClientSocket
-      if(!transport.isOpen()) {
+      if (!transport.isOpen()) {
         transport.open();
       }
       if (protocol.equals(BINARY_PROTOCOL)) {
         LOGGER.info("Using TBinaryProtocol");
-        client = new ThriftSourceProtocol.Client(new TBinaryProtocol
-            (transport));
+        client = new ThriftSourceProtocol.Client(new TBinaryProtocol(transport));
       } else {
         LOGGER.info("Using TCompactProtocol");
-        client = new ThriftSourceProtocol.Client(new TCompactProtocol
-            (transport));
+        client = new ThriftSourceProtocol.Client(new TCompactProtocol(transport));
       }
       // Not a great hash code, but since this class is immutable and there
       // is at most one instance of the components of this class,
@@ -423,12 +416,12 @@ public class ThriftRpcClient extends AbstractRpcClient {
     }
 
     public boolean equals(Object o) {
-      if(o == null) {
+      if (o == null) {
         return false;
       }
       // Since there is only one wrapper with any given client,
       // direct comparison is good enough.
-      if(this == o) {
+      if (this == o) {
         return true;
       }
       return false;
@@ -507,10 +500,8 @@ public class ThriftRpcClient extends AbstractRpcClient {
           c.transport.close();
           currentPoolSize--;
         }
-      /*
-       * Be cruel and close even the checked out clients. The threads writing
-       * using these will now get an exception.
-       */
+        // Be cruel and close even the checked out clients. The threads writing
+        // using these will now get an exception.
         for (ClientWrapper c : checkedOutClients) {
           c.transport.close();
           currentPoolSize--;
@@ -522,12 +513,14 @@ public class ThriftRpcClient extends AbstractRpcClient {
   }
 
   /**
-   * Lifted from ACCUMULO-3318 - Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
-   * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
-   *
+   * Lifted from ACCUMULO-3318 - Lifted from TSSLTransportFactory in Thrift-0.9.1.
+   * The method to create a client socket with an SSLContextFactory object is not visible to us.
+   * Have to use * SslConnectionParams instead of TSSLTransportParameters because no getters exist
+   * on TSSLTransportParameters.
    */
   private static SSLContext createSSLContext(String truststore,
-    String truststorePassword, String truststoreType) throws FlumeException {
+                                             String truststorePassword,
+                                             String truststoreType) throws FlumeException {
     SSLContext ctx;
     try {
       ctx = SSLContext.getInstance("TLS");
@@ -550,7 +543,8 @@ public class ThriftRpcClient extends AbstractRpcClient {
   }
 
   private static TSocket createSSLSocket(SSLSocketFactory factory, String host,
-          int port, int timeout, List<String> excludeProtocols) throws FlumeException {
+                                         int port, int timeout, List<String> excludeProtocols)
+      throws FlumeException {
     try {
       SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
       socket.setSoTimeout(timeout);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
index a5e01fc..c19925a 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
@@ -37,7 +37,7 @@ public class EventBuilder {
   public static Event withBody(byte[] body, Map<String, String> headers) {
     Event event = new SimpleEvent();
 
-    if(body == null) {
+    if (body == null) {
       body = new byte[0];
     }
     event.setBody(body);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
index 0ec1678..9ee90ae 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
@@ -26,7 +26,7 @@ import org.apache.flume.FlumeException;
 /**
  *
  */
-public class JSONEvent implements Event{
+public class JSONEvent implements Event {
   private Map<String, String> headers;
   private String body;
   private transient String charset = "UTF-8";
@@ -43,7 +43,7 @@ public class JSONEvent implements Event{
 
   @Override
   public byte[] getBody() {
-    if(body != null) {
+    if (body != null) {
       try {
         return body.getBytes(charset);
       } catch (UnsupportedEncodingException ex) {
@@ -57,7 +57,7 @@ public class JSONEvent implements Event{
 
   @Override
   public void setBody(byte[] body) {
-    if(body != null) {
+    if (body != null) {
       this.body = new String(body);
     } else {
       this.body = "";

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
index a7ac36f..61f848d 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
@@ -51,7 +51,7 @@ public class SimpleEvent implements Event {
 
   @Override
   public void setBody(byte[] body) {
-    if(body == null){
+    if (body == null) {
       body = new byte[0];
     }
     this.body = body;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
index fd9e81f..806a553 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
@@ -40,9 +40,8 @@ import java.util.concurrent.TimeUnit;
 public abstract class OrderSelector<T> {
 
   private static final int EXP_BACKOFF_COUNTER_LIMIT = 16;
-  private static final long CONSIDER_SEQUENTIAL_RANGE = TimeUnit.HOURS
-    .toMillis(1);
-  private static final long MAX_TIMEOUT = 30000l;
+  private static final long CONSIDER_SEQUENTIAL_RANGE = TimeUnit.HOURS.toMillis(1);
+  private static final long MAX_TIMEOUT = 30000L;
   private final Map<T, FailureState> stateMap =
           new LinkedHashMap<T, FailureState>();
   private long maxTimeout = MAX_TIMEOUT;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index a9f42b8..e3b57c3 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -104,12 +104,12 @@ public class DatasetSink extends AbstractSink implements Configurable {
   /**
    * The last time the writer rolled.
    */
-  private long lastRolledMillis = 0l;
+  private long lastRolledMillis = 0L;
 
   /**
    * The raw number of bytes parsed.
    */
-  private long bytesParsed = 0l;
+  private long bytesParsed = 0L;
 
   /**
    * A class for parsing Kite entities from Flume Events.
@@ -225,7 +225,7 @@ public class DatasetSink extends AbstractSink implements Configurable {
    */
   @VisibleForTesting
   void roll() {
-    this.lastRolledMillis = 0l;
+    this.lastRolledMillis = 0L;
   }
 
   @VisibleForTesting
@@ -434,7 +434,7 @@ public class DatasetSink extends AbstractSink implements Configurable {
 
       // Reset the last rolled time and the metrics
       this.lastRolledMillis = System.currentTimeMillis();
-      this.bytesParsed = 0l;
+      this.bytesParsed = 0L;
     } catch (DatasetNotFoundException ex) {
       throw new EventDeliveryException("Dataset " + datasetUri + " not found."
           + " The dataset must be created before Flume can write to it.", ex);
@@ -558,7 +558,7 @@ public class DatasetSink extends AbstractSink implements Configurable {
       try {
         // If the transaction wasn't committed before we got the exception, we
         // need to rollback.
-          transaction.rollback();
+        transaction.rollback();
       } catch (RuntimeException ex) {
         LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage());
         LOG.debug("Exception follows.", ex);
@@ -567,7 +567,7 @@ public class DatasetSink extends AbstractSink implements Configurable {
         this.transaction = null;
       }
     }
-}
+  }
 
   /**
    * Get the name of the dataset from the URI

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
index 8f6c0ae..4373429 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java
@@ -50,5 +50,4 @@ public class NonRecoverableEventException extends Exception {
     super(t);
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
index cfb7349..3720ff3 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java
@@ -25,7 +25,6 @@ import org.apache.flume.Context;
 
 import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
 
-
 public class EntityParserFactory {
 
   public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context config) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
index a8b2008..d3b1fe8 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java
@@ -23,7 +23,6 @@ import org.apache.flume.Context;
 
 import static org.apache.flume.sink.kite.DatasetSinkConstants.*;
 
-
 public class FailurePolicyFactory {
 
   public FailurePolicy newPolicy(Context config) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index e367e12..2fe309f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -50,7 +50,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
   private Integer numberOfCloseRetries = null;
   private long timeBetweenCloseRetries = Long.MAX_VALUE;
 
-  final static Object [] NO_ARGS = new Object []{};
+  static final Object[] NO_ARGS = new Object[]{};
 
   @Override
   public void configure(Context context) {
@@ -63,11 +63,12 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
 
     if (numberOfCloseRetries > 1) {
       try {
-        timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l);
+        timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000L);
       } catch (NumberFormatException e) {
-        logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout"));
+        logger.warn("hdfs.callTimeout can not be parsed to a long: " +
+                    context.getLong("hdfs.callTimeout"));
       }
-      timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);
+      timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries / numberOfCloseRetries, 1000);
     }
 
   }
@@ -232,7 +233,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
 
   private Method reflectHflushOrSync(FSDataOutputStream os) {
     Method m = null;
-    if(os != null) {
+    if (os != null) {
       Class<?> fsDataOutputStreamClass = os.getClass();
       try {
         m = fsDataOutputStreamClass.getMethod("hflush");
@@ -242,7 +243,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
           m = fsDataOutputStreamClass.getMethod("sync");
         } catch (Exception ex1) {
           String msg = "Neither hflush not sync were found. That seems to be " +
-            "a problem!";
+              "a problem!";
           logger.error(msg);
           throw new FlumeException(msg, ex1);
         }
@@ -266,7 +267,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
       String msg = "Error while trying to hflushOrSync!";
       logger.error(msg);
       Throwable cause = e.getCause();
-      if(cause != null && cause instanceof IOException) {
+      if (cause != null && cause instanceof IOException) {
         throw (IOException)cause;
       }
       throw new FlumeException(msg, e);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
index 1aca58f..1d8a9e4 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
@@ -20,7 +20,7 @@ package org.apache.flume.sink.hdfs;
 
 import org.apache.flume.FlumeException;
 
-public class BucketClosedException extends FlumeException{
+public class BucketClosedException extends FlumeException {
 
   private static final long serialVersionUID = -4216667125119540357L;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 6b97de6..b096410 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -18,22 +18,8 @@
 
 package org.apache.flume.sink.hdfs;
 
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -49,7 +35,20 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Internal API intended for HDFSSink use.
@@ -117,14 +116,14 @@ class BucketWriter {
   AtomicInteger renameTries = new AtomicInteger(0);
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
-    Context context, String filePath, String fileName, String inUsePrefix,
-    String inUseSuffix, String fileSuffix, CompressionCodec codeC,
-    CompressionType compType, HDFSWriter writer,
-    ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
-    SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
-    String onCloseCallbackPath, long callTimeout,
-    ExecutorService callTimeoutPool, long retryInterval,
-    int maxCloseTries) {
+      Context context, String filePath, String fileName, String inUsePrefix,
+      String inUseSuffix, String fileSuffix, CompressionCodec codeC,
+      CompressionType compType, HDFSWriter writer,
+      ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
+      SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
+      String onCloseCallbackPath, long callTimeout,
+      ExecutorService callTimeoutPool, long retryInterval,
+      int maxCloseTries) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -181,17 +180,15 @@ class BucketWriter {
         Path.class);
     } catch (Exception e) {
       LOG.warn("isFileClosed is not available in the " +
-        "version of HDFS being used. Flume will not " +
-        "attempt to close files if the close fails on " +
-        "the first attempt",e);
+          "version of HDFS being used. Flume will not " +
+          "attempt to close files if the close fails on " +
+          "the first attempt",e);
       return null;
     }
   }
 
-  private Boolean isFileClosed(FileSystem fs,
-    Path tmpFilePath) throws Exception {
-    return (Boolean)(isClosedMethod.invoke(fs,
-      tmpFilePath));
+  private Boolean isFileClosed(FileSystem fs, Path tmpFilePath) throws Exception {
+    return (Boolean)(isClosedMethod.invoke(fs, tmpFilePath));
   }
 
   /**
@@ -239,17 +236,15 @@ class BucketWriter {
               // Need to get reference to FS using above config before underlying
               // writer does in order to avoid shutdown hook &
               // IllegalStateExceptions
-              if(!mockFsInjected) {
-                fileSystem = new Path(bucketPath).getFileSystem(
-                  config);
+              if (!mockFsInjected) {
+                fileSystem = new Path(bucketPath).getFileSystem(config);
               }
               writer.open(bucketPath);
             } else {
               // need to get reference to FS before writer does to
               // avoid shutdown hook
-              if(!mockFsInjected) {
-                fileSystem = new Path(bucketPath).getFileSystem(
-                  config);
+              if (!mockFsInjected) {
+                fileSystem = new Path(bucketPath).getFileSystem(config);
               }
               writer.open(bucketPath, codeC, compType);
             }
@@ -278,7 +273,7 @@ class BucketWriter {
           try {
             // Roll the file and remove reference from sfWriters map.
             close(true);
-          } catch(Throwable t) {
+          } catch (Throwable t) {
             LOG.error("Unexpected error", t);
           }
           return null;
@@ -327,7 +322,7 @@ class BucketWriter {
       public Void call() throws Exception {
         if (renameTries >= maxRenameTries) {
           LOG.warn("Unsuccessfully attempted to rename " + path + " " +
-            maxRenameTries + " times. File may still be open.");
+              maxRenameTries + " times. File may still be open.");
           return null;
         }
         renameTries++;
@@ -335,16 +330,15 @@ class BucketWriter {
           renameBucket(path, finalPath, fs);
         } catch (Exception e) {
           LOG.warn("Renaming file: " + path + " failed. Will " +
-            "retry again in " + retryInterval + " seconds.", e);
-          timedRollerPool.schedule(this, retryInterval,
-            TimeUnit.SECONDS);
+              "retry again in " + retryInterval + " seconds.", e);
+          timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS);
           return null;
         }
         return null;
       }
     };
-
   }
+
   /**
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
@@ -352,7 +346,7 @@ class BucketWriter {
    * @throws InterruptedException
    */
   public synchronized void close(boolean callCloseCallback)
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     try {
       flush();
@@ -367,9 +361,8 @@ class BucketWriter {
         callWithTimeout(closeCallRunner);
         sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
-        LOG.warn(
-          "failed to close() HDFSWriter for file (" + bucketPath +
-            "). Exception follows.", e);
+        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
+                 "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
         failedToClose = true;
       }
@@ -393,15 +386,12 @@ class BucketWriter {
       // could block or throw IOException
       try {
         renameBucket(bucketPath, targetPath, fileSystem);
-      } catch(Exception e) {
-        LOG.warn(
-          "failed to rename() file (" + bucketPath +
-          "). Exception follows.", e);
+      } catch (Exception e) {
+        LOG.warn("failed to rename() file (" + bucketPath +
+                 "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
-        final Callable<Void> scheduledRename =
-                createScheduledRenameCallable();
-        timedRollerPool.schedule(scheduledRename, retryInterval,
-                TimeUnit.SECONDS);
+        final Callable<Void> scheduledRename = createScheduledRenameCallable();
+        timedRollerPool.schedule(scheduledRename, retryInterval, TimeUnit.SECONDS);
       }
     }
     if (callCloseCallback) {
@@ -420,14 +410,14 @@ class BucketWriter {
     if (!isBatchComplete()) {
       doFlush();
 
-      if(idleTimeout > 0) {
+      if (idleTimeout > 0) {
         // if the future exists and couldn't be cancelled, that would mean it has already run
         // or been cancelled
-        if(idleFuture == null || idleFuture.cancel(false)) {
+        if (idleFuture == null || idleFuture.cancel(false)) {
           Callable<Void> idleAction = new Callable<Void>() {
             public Void call() throws Exception {
               LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
-                System.currentTimeMillis());
+                       System.currentTimeMillis());
               if (isOpen) {
                 close(true);
               }
@@ -443,10 +433,10 @@ class BucketWriter {
 
   private void runCloseAction() {
     try {
-      if(onCloseCallback != null) {
+      if (onCloseCallback != null) {
         onCloseCallback.run(onCloseCallbackPath);
       }
-    } catch(Throwable t) {
+    } catch (Throwable t) {
       LOG.error("Unexpected error", t);
     }
   }
@@ -483,19 +473,19 @@ class BucketWriter {
     checkAndThrowInterruptedException();
     // If idleFuture is not null, cancel it before we move forward to avoid a
     // close call in the middle of the append.
-    if(idleFuture != null) {
+    if (idleFuture != null) {
       idleFuture.cancel(false);
       // There is still a small race condition - if the idleFuture is already
       // running, interrupting it can cause HDFS close operation to throw -
       // so we cannot interrupt it while running. If the future could not be
       // cancelled, it is already running - wait for it to finish before
       // attempting to write.
-      if(!idleFuture.isDone()) {
+      if (!idleFuture.isDone()) {
         try {
           idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
         } catch (TimeoutException ex) {
           LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
-            " file close may have failed", ex);
+                   " file close may have failed", ex);
         } catch (Exception ex) {
           LOG.warn("Error while trying to cancel closing of idle file. ", ex);
         }
@@ -612,10 +602,9 @@ class BucketWriter {
   // this method can get called from the scheduled thread so the
   // file gets closed later - so an implicit reference to this
   // bucket writer would still be alive in the Callable instance.
-  private void renameBucket(String bucketPath,
-    String targetPath, final FileSystem fs) throws IOException,
-    InterruptedException {
-    if(bucketPath.equals(targetPath)) {
+  private void renameBucket(String bucketPath, String targetPath, final FileSystem fs)
+      throws IOException, InterruptedException {
+    if (bucketPath.equals(targetPath)) {
       return;
     }
 
@@ -646,7 +635,7 @@ class BucketWriter {
   }
 
   void setClock(Clock clock) {
-      this.clock = clock;
+    this.clock = clock;
   }
 
   /**
@@ -669,7 +658,7 @@ class BucketWriter {
    * cancel the callable and throw an IOException
    */
   private <T> T callWithTimeout(final CallRunner<T> callRunner)
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Future<T> future = callTimeoutPool.submit(new Callable<T>() {
       @Override
       public T call() throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index f128795..80b7cb4 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -78,8 +78,8 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
-    if(useRawLocalFileSystem) {
-      if(hdfs instanceof LocalFileSystem) {
+    if (useRawLocalFileSystem) {
+      if (hdfs instanceof LocalFileSystem) {
         hdfs = ((LocalFileSystem)hdfs).getRaw();
       } else {
         logger.warn("useRawLocalFileSystem is set to true but file system " +
@@ -87,14 +87,13 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
       }
     }
     boolean appending = false;
-    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
-    (dstPath)) {
+    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
       fsOut = hdfs.append(dstPath);
       appending = true;
     } else {
       fsOut = hdfs.create(dstPath);
     }
-    if(compressor == null) {
+    if (compressor == null) {
       compressor = CodecPool.getCompressor(codec, conf);
     }
     cmpOut = codec.createOutputStream(fsOut, compressor);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index 7054bfc..c4ad919 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -37,8 +37,7 @@ import org.slf4j.LoggerFactory;
 
 public class HDFSDataStream extends AbstractHDFSWriter {
 
-  private static final Logger logger =
-      LoggerFactory.getLogger(HDFSDataStream.class);
+  private static final Logger logger = LoggerFactory.getLogger(HDFSDataStream.class);
 
   private FSDataOutputStream outStream;
   private String serializerType;
@@ -60,16 +59,13 @@ public class HDFSDataStream extends AbstractHDFSWriter {
   }
 
   @VisibleForTesting
-  protected FileSystem getDfs(Configuration conf,
-    Path dstPath) throws IOException{
-    return  dstPath.getFileSystem(conf);
+  protected FileSystem getDfs(Configuration conf, Path dstPath) throws IOException {
+    return dstPath.getFileSystem(conf);
   }
 
-  protected void doOpen(Configuration conf,
-    Path dstPath, FileSystem hdfs) throws
-    IOException {
-    if(useRawLocalFileSystem) {
-      if(hdfs instanceof LocalFileSystem) {
+  protected void doOpen(Configuration conf, Path dstPath, FileSystem hdfs) throws IOException {
+    if (useRawLocalFileSystem) {
+      if (hdfs instanceof LocalFileSystem) {
         hdfs = ((LocalFileSystem)hdfs).getRaw();
       } else {
         logger.warn("useRawLocalFileSystem is set to true but file system " +
@@ -78,8 +74,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
     }
 
     boolean appending = false;
-    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
-            (dstPath)) {
+    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
       outStream = hdfs.append(dstPath);
       appending = true;
     } else {


Mime
View raw message