eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [07/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:14:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 99c1fed..9c04fa4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -18,14 +18,6 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -37,33 +29,30 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpoutMetric;
-import storm.kafka.KafkaSpoutWrapper;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
 import backtype.storm.spout.SchemeAsMultiScheme;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
-
 import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.*;
+
+import java.text.MessageFormat;
+import java.util.*;
 
 /**
  * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
  *
- * 1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
- *    updated topic
- *
+ * <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
+ * updated topic</p>
  */
-public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener,SerializationMetadataProvider {
+public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener, SerializationMetadataProvider {
     private static final long serialVersionUID = -5280723341236671580L;
-    private static final Logger LOG  = LoggerFactory.getLogger(CorrelationSpout.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class);
 
     public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers";
     public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
@@ -91,24 +80,25 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     private volatile Map<String, StreamDefinition> sds;
 
     /**
-     * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service
+     * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service.
+     *
      * @param config
      * @param topologyId
      * @param changeNotifyService
      * @param numOfRouterBolts
      */
-    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts){
+    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts) {
         this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
     }
+
     /**
-     *
      * @param config
-     * @param topologyId used for distinguishing kafka offset for different topologies
+     * @param topologyId       used for distinguishing kafka offset for different topologies
      * @param numOfRouterBolts used for generating streamId and routing
-     * @param spoutName used for generating streamId between spout and router bolt
-     * @param routerBoltName used for generating streamId between spout and router bolt
+     * @param spoutName        used for generating streamId between spout and router bolt
+     * @param routerBoltName   used for generating streamId between spout and router bolt.
      */
-    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName){
+    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName) {
         this.config = config;
         this.topologyId = topologyId;
         this.changeNotifyService = changeNotifyService;
@@ -117,16 +107,17 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         this.routeBoltName = routerBoltName;
     }
 
-    public String getSpoutName(){
+    public String getSpoutName() {
         return spoutName;
     }
 
-    public String getRouteBoltName(){
+    public String getRouteBoltName() {
         return routeBoltName;
     }
 
     /**
-     * the only output field is for StreamEvent
+     * the only output field is for StreamEvent.
+     *
      * @param declarer
      */
     @Override
@@ -141,7 +132,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     @SuppressWarnings("rawtypes")
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("open method invoked");
         }
         this.conf = conf;
@@ -165,9 +156,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     @Override
     public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) {
         LOG.info("new metadata is updated " + spec);
-        try{
+        try {
             onReload(spec, sds);
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error applying new SpoutSpec", ex);
         }
     }
@@ -181,7 +172,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
 
     /**
      * find the correct wrapper to do ack that means msgId should be mapped to
-     * wrapper
+     * wrapper.
      *
      * @param msgId
      */
@@ -217,7 +208,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             wrapper.close();
         }
     }
-    
+
     private List<String> getTopics(SpoutSpec spoutSpec) {
         List<String> meta = new ArrayList<String>();
         for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) {
@@ -246,7 +237,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         // copy and swap
         Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList);
         // iterate new topics and then create KafkaSpout
-        for(String topic : newTopics){
+        for (String topic : newTopics) {
             KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
             if (wrapper != null) {
                 LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
@@ -256,7 +247,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             newKafkaSpoutList.put(topic, newWrapper);
         }
         // iterate remove topics and then close KafkaSpout
-        for(String topic : removeTopics){
+        for (String topic : removeTopics) {
             KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
             if (wrapper == null) {
                 LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
@@ -267,7 +258,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         }
 
         // iterate update topic and then update metadata
-        for(String topic : updateTopics){
+        for (String topic : updateTopics) {
             KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
             if (spoutWrapper == null) {
                 LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
@@ -291,7 +282,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
      * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know
      * processed offset for itself
      *
-     * TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
+     * <p>TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
      *
      * @param conf
      * @param context
@@ -302,7 +293,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
      */
     @SuppressWarnings("rawtypes")
     protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
-                                                 String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{
+                                                 String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
         String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum");
         BrokerHosts hosts = null;
         if (config.hasPath("spout.kafkaBrokerZkBasePath")) {
@@ -311,22 +302,22 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             hosts = new ZkHosts(kafkaBrokerZkQuorum);
         }
         String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
-        if(config.hasPath("spout.stormKafkaTransactionZkPath")) {
+        if (config.hasPath("spout.stormKafkaTransactionZkPath")) {
             transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
         }
         // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
         String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
-        if(config.hasPath("spout.stormKafkaEagleConsumer")){
+        if (config.hasPath("spout.stormKafkaEagleConsumer")) {
             zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer");
         }
         SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
         // transaction zkServers
         boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
-        if(stormKafkaUseSameZkQuorumWithKafkaBroker){
+        if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
             ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
             spoutConfig.zkServers = utils.getZkHosts();
             spoutConfig.zkPort = utils.getZkPort();
-        }else{
+        } else {
             ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum"));
             spoutConfig.zkServers = utils.getZkHosts();
             spoutConfig.zkPort = utils.getZkPort();
@@ -342,9 +333,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
 
         spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic, conf));
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
-        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer);
+        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer);
         wrapper.open(conf, context, collectorWrapper);
-        
+
         if (LOG.isInfoEnabled()) {
             LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
         }
@@ -359,10 +350,11 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     /**
      * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient)
      */
-    private static class ZkServerPortUtils{
+    private static class ZkServerPortUtils {
         private List<String> zkHosts = new ArrayList<>();
         private Integer zkPort;
-        public ZkServerPortUtils(String zkQuorum){
+
+        public ZkServerPortUtils(String zkQuorum) {
             String[] zkConnections = zkQuorum.split(",");
             for (String zkConnection : zkConnections) {
                 zkHosts.add(zkConnection.split(":")[0]);
@@ -370,16 +362,16 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
         }
 
-        public List<String> getZkHosts(){
+        public List<String> getZkHosts() {
             return zkHosts;
         }
 
-        public Integer getZkPort(){
+        public Integer getZkPort() {
             return zkPort;
         }
     }
 
-    protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+    protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
         try {
             wrapper.close();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
index 22993b3..5b7e542 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
@@ -21,7 +21,7 @@ package org.apache.eagle.alert.engine.spout;
 import org.slf4j.Logger;
 
 /**
- * normally this is used in unit test for convenience
+ * normally this is used in unit test for convenience.
  */
 public class CreateTopicUtils {
 
@@ -31,13 +31,13 @@ public class CreateTopicUtils {
     private static final int replicationFactor = 1;
 
     public static void ensureTopicReady(String zkQuorum, String topic) {
-//        ZkConnection zkConnection = new ZkConnection(zkQuorum);
-//        ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
-////        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-//        if (!AdminUtils.topicExists(zkClient, topic)) {
-//            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
-//                    + replicationFactor);
-//            AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-//        }
+        //        ZkConnection zkConnection = new ZkConnection(zkQuorum);
+        //        ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
+        ////        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+        //        if (!AdminUtils.topicExists(zkClient, topic)) {
+        //            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
+        //                    + replicationFactor);
+        //            AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+        //        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
index 23e94c3..3c8c99d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
@@ -19,22 +19,23 @@
 
 package org.apache.eagle.alert.engine.spout;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
+import java.util.Map;
+
 /**
  * topic to stream metadata lifecycle method
  * one topic may spawn multiple streams, the metadata change includes
  * 1. add/remove stream
  * 2. for a specific stream, groupingstrategy is changed
- *    ex1, this stream has more alert bolts than before, then this spout would take more traffic
- *    ex2, this stream has less alert bolts than before, then this spout would take less traffic
+ * ex1, this stream has more alert bolts than before, then this spout would take more traffic
+ * ex2, this stream has less alert bolts than before, then this spout would take less traffic
  */
 public interface ISpoutSpecLCM {
     /**
      * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts.
+     *
      * @param metadata
      */
     void update(SpoutSpec metadata, Map<String, StreamDefinition> sds);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
index c786c01..ccd3493 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
@@ -26,13 +26,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
  */
 public class KafkaMessageIdWrapper {
     public Object id;
-    public KafkaMessageIdWrapper(Object o){
+
+    public KafkaMessageIdWrapper(Object o) {
         this.id = o;
     }
+
     public String topic;
-    private final static ObjectMapper objectMapper = new ObjectMapper();
+    private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    public String toString(){
+    public String toString() {
         try {
             return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", topic, objectMapper.writeValueAsString(id));
         } catch (JsonProcessingException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
index bfd5da7..f1945be 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
@@ -19,21 +19,21 @@
 
 package org.apache.eagle.alert.engine.spout;
 
-import java.util.Map;
-
 import backtype.storm.spout.Scheme;
 
+import java.util.Map;
+
 
 /**
  * All Scheme implementations should have the following conditions
  * 1) implement Scheme interface
- * 2) has one constructor with topic name as parameter
+ * 2) has one constructor with topic name as parameter.
  */
 public class SchemeBuilder {
 
     @SuppressWarnings("rawtypes")
-    public static Scheme buildFromClsName(String clsName, String topic, Map conf) throws Exception{
+    public static Scheme buildFromClsName(String clsName, String topic, Map conf) throws Exception {
         Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf);
-        return (Scheme)o;
+        return (Scheme) o;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index 2f7cc68..a8dcc0d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -18,11 +18,6 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
 import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
@@ -34,17 +29,21 @@ import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.utils.StreamIdConversion;
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * intercept the message sent from within KafkaSpout and select downstream bolts based on meta-data
- * This is topic based. each topic will have one SpoutOutputCollectorWrapper
+ * This is topic based. each topic will have one SpoutOutputCollectorWrapper.
  */
-public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements ISpoutSpecLCM,SerializationMetadataProvider {
+public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements ISpoutSpecLCM, SerializationMetadataProvider {
     private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorWrapper.class);
 
     private final ISpoutOutputCollector delegate;
@@ -58,9 +57,9 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
     private volatile Map<String, StreamDefinition> sds;
 
     /**
-     * @param delegate   actual SpoutOutputCollector to send data to following bolts
-     * @param topic      topic for this KafkaSpout to handle
-     * @param numGroupbyBolts bolts following this spout
+     * @param delegate        actual SpoutOutputCollector to send data to following bolts
+     * @param topic           topic for this KafkaSpout to handle
+     * @param numGroupbyBolts bolts following this spout.
      * @param serializer
      */
     public SpoutOutputCollectorWrapper(CorrelationSpout spout,
@@ -83,31 +82,31 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
     /**
      * How to assert that numTotalGroupbyBolts >= numOfRouterBolts, otherwise
      * there is runtime issue by default, tuple includes 2 fields field 1: topic
-     * name field 2: map of key/value
+     * name field 2: map of key/value.
      */
     @SuppressWarnings("rawtypes")
     @Override
     public List<Integer> emit(List<Object> tuple, Object messageId) {
         if (!sanityCheck()) {
             LOG.error(
-                    "spout collector for topic {} see monitored metadata invalid, is this data source removed! Trigger message id {} ",
-                    topic, messageId);
+                "spout collector for topic {} see monitored metadata invalid, is this data source removed! Trigger message id {} ",
+                topic, messageId);
             return null;
         }
 
         KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId);
         newMessageId.topic = topic;
         /**
-            phase 1: tuple to stream converter
-            if this topic multiplexes multiple streams, then retrieve the individual streams
-        */
+         phase 1: tuple to stream converter
+         if this topic multiplexes multiple streams, then retrieve the individual streams.
+         */
         List<Object> convertedTuple = converter.convert(tuple);
-        if(convertedTuple == null) {
+        if (convertedTuple == null) {
             LOG.warn("source data {} can't be converted to a stream, ignore this message", tuple);
             spout.ack(newMessageId);
             return null;
         }
-        Map m = (Map)convertedTuple.get(3);
+        Map m = (Map) convertedTuple.get(3);
         Object streamId = convertedTuple.get(1);
 
         StreamDefinition sd = sds.get(streamId);
@@ -117,24 +116,24 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
             return null;
         }
 
-        StreamEvent event = convertToStreamEventByStreamDefinition((Long)convertedTuple.get(2), m, sds.get(streamId));
+        StreamEvent event = convertToStreamEventByStreamDefinition((Long) convertedTuple.get(2), m, sds.get(streamId));
         /*
             phase 2: stream repartition
         */
-        for(StreamRepartitionMetadata md : streamRepartitionMetadataList) {
+        for (StreamRepartitionMetadata md : streamRepartitionMetadataList) {
             // one stream may have multiple group-by strategies, each strategy is for a specific group-by
-            for(StreamRepartitionStrategy groupingStrategy : md.groupingStrategies){
+            for (StreamRepartitionStrategy groupingStrategy : md.groupingStrategies) {
                 int hash = 0;
-                if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY)) {
+                if (groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY)) {
                     hash = getRoutingHashByGroupingStrategy(m, groupingStrategy);
-                }else if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)){
-                    hash = Math.abs((int)System.currentTimeMillis());
+                } else if (groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)) {
+                    hash = Math.abs((int) System.currentTimeMillis());
                 }
                 int mod = hash % groupingStrategy.numTotalParticipatingRouterBolts;
                 // filter out message
                 if (mod >= groupingStrategy.startSequence && mod < groupingStrategy.startSequence + numOfRouterBolts) {
                     // framework takes care of field grouping instead of using storm internal field grouping
-                    String sid = StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), spout.getRouteBoltName()+ (hash % numOfRouterBolts));
+                    String sid = StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), spout.getRouteBoltName() + (hash % numOfRouterBolts));
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Emitted tuple: {} with message Id: {}, with topic {}, to streamId {}", convertedTuple, messageId, topic, sid);
                     }
@@ -154,9 +153,9 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
                     // ******* short-cut ack ********
                     // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty
                     // before moving to next offsets.
-                    if(LOG.isDebugEnabled()){
+                    if (LOG.isDebugEnabled()) {
                         LOG.debug("Message filtered with mod {} not within range {} and {} for message {}", mod, groupingStrategy.startSequence,
-                                groupingStrategy.startSequence+ numOfRouterBolts, tuple);
+                            groupingStrategy.startSequence + numOfRouterBolts, tuple);
                     }
                     spout.ack(newMessageId);
                 }
@@ -167,11 +166,11 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
     }
 
     @SuppressWarnings("rawtypes")
-    private int getRoutingHashByGroupingStrategy(Map data, StreamRepartitionStrategy gs){
+    private int getRoutingHashByGroupingStrategy(Map data, StreamRepartitionStrategy gs) {
         // calculate hash value for values from group-by fields
         HashCodeBuilder hashCodeBuilder = new HashCodeBuilder();
-        for(String groupingField : gs.partition.getColumns()) {
-            if(data.get(groupingField) != null){
+        for (String groupingField : gs.partition.getColumns()) {
+            if (data.get(groupingField) != null) {
                 hashCodeBuilder.append(data.get(groupingField));
             } else {
                 LOG.warn("Required GroupBy fields {} not found: {}", gs.partition.getColumns(), data);
@@ -195,13 +194,14 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
         return isOk;
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, Map m, StreamDefinition sd){
-        return StreamEvent.Builder().timestamep(timestamp).attributes(m,sd).build();
+    @SuppressWarnings( {"rawtypes", "unchecked"})
+    private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, Map m, StreamDefinition sd) {
+        return StreamEvent.builder().timestamep(timestamp).attributes(m, sd).build();
     }
 
     /**
-     * SpoutSpec may be changed, this class will respond to changes on tuple2StreamMetadataMap and streamRepartitionMetadataMap
+     * SpoutSpec may be changed, this class will respond to changes on tuple2StreamMetadataMap and streamRepartitionMetadataMap.
+     *
      * @param spoutSpec
      * @param sds
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
index 0fbe7b3..075d827 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.alert.engine.utils;
 
+import com.google.common.io.ByteStreams;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -23,8 +25,6 @@ import java.io.OutputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
-import com.google.common.io.ByteStreams;
-
 
 public class CompressionUtils {
     public static byte[] compress(byte[] source) throws IOException {
@@ -45,7 +45,7 @@ public class CompressionUtils {
         }
     }
 
-    public static byte[] decompress(byte[] compressed) throws IOException{
+    public static byte[] decompress(byte[] compressed) throws IOException {
         if (compressed == null || compressed.length == 0) {
             return compressed;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
index 1060d32..f6da4a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
@@ -19,14 +19,14 @@
 
 package org.apache.eagle.alert.engine.utils;
 
-import java.io.InputStream;
-
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
+
 /**
  * Since 5/6/16.
  */
@@ -34,64 +34,69 @@ public class MetadataSerDeser {
     private static final Logger LOG = LoggerFactory.getLogger(MetadataSerDeser.class);
 
     @SuppressWarnings("rawtypes")
-    public static <K> K deserialize(InputStream is, TypeReference typeRef){
+    public static <K> K deserialize(InputStream is, TypeReference typeRef) {
         ObjectMapper mapper = new ObjectMapper();
         try {
             K spec = mapper.readValue(is, typeRef);
             return spec;
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error in deserializing metadata of type {} from input stream",
-                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
+                new TypeReference<K>() {
+                }.getType().getClass().getCanonicalName(), ex);
         }
         return null;
     }
 
-    public static <K> K deserialize(InputStream is, Class<K> cls){
+    public static <K> K deserialize(InputStream is, Class<K> cls) {
         ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);
+        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
         try {
             K spec = mapper.readValue(is, cls);
             return spec;
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("Got error to deserialize metadata of type {} from input stream",
-                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
+                new TypeReference<K>() {
+                }.getType().getClass().getCanonicalName(), ex);
         }
         return null;
     }
 
     @SuppressWarnings("rawtypes")
-    public static <K> K deserialize(String json, TypeReference typeRef){
+    public static <K> K deserialize(String json, TypeReference typeRef) {
         ObjectMapper mapper = new ObjectMapper();
         try {
             K spec = mapper.readValue(json, typeRef);
             return spec;
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error in deserializing metadata of type {} from {}",
-                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(), json, ex);
+                new TypeReference<K>() {
+                }.getType().getClass().getCanonicalName(), json, ex);
         }
         return null;
     }
 
-    public static <K> K deserialize(String json, Class<K> cls){
+    public static <K> K deserialize(String json, Class<K> cls) {
         ObjectMapper mapper = new ObjectMapper();
         try {
             K spec = mapper.readValue(json, cls);
             return spec;
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error in deserializing metadata of type {} from {}",
-                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(), json, ex);
+                new TypeReference<K>() {
+                }.getType().getClass().getCanonicalName(), json, ex);
         }
         return null;
     }
 
-    public static <K> String serialize(K spec){
+    public static <K> String serialize(K spec) {
         ObjectMapper mapper = new ObjectMapper();
-        try{
+        try {
             String json = mapper.writeValueAsString(spec);
             return json;
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error in serializing object {} with type {}", spec,
-                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
+                new TypeReference<K>() {
+                }.getType().getClass().getCanonicalName(), ex);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
index 090d61c..509cbf4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
@@ -19,12 +19,7 @@ package org.apache.eagle.alert.engine.utils;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.io.*;
 
 
 /**
@@ -46,8 +41,8 @@ public class SerializableUtils {
             return buffer.toByteArray();
         } catch (IOException exn) {
             throw new IllegalArgumentException(
-                    "unable to serialize " + value,
-                    exn);
+                "unable to serialize " + value,
+                exn);
         }
     }
 
@@ -84,8 +79,8 @@ public class SerializableUtils {
             }
         } catch (IOException | ClassNotFoundException exn) {
             throw new IllegalArgumentException(
-                    "unable to deserialize " + description,
-                    exn);
+                "unable to deserialize " + description,
+                exn);
         }
     }
 
@@ -101,27 +96,27 @@ public class SerializableUtils {
                                                             String description) {
         try {
             try (ObjectInputStream ois = new ObjectInputStream(
-                    new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
+                new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
                 return ois.readObject();
             }
         } catch (IOException | ClassNotFoundException exn) {
             throw new IllegalArgumentException(
-                    "unable to deserialize " + description,
-                    exn);
+                "unable to deserialize " + description,
+                exn);
         }
     }
 
     public static <T extends Serializable> T ensureSerializable(T value) {
         @SuppressWarnings("unchecked")
         T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
-                value.toString());
+            value.toString());
         return copy;
     }
 
     public static <T extends Serializable> T clone(T value) {
         @SuppressWarnings("unchecked")
         T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
-                value.toString());
+            value.toString());
         return copy;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
index dd58172..341e665 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
@@ -16,27 +16,22 @@
  */
 package storm.kafka;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import backtype.storm.metric.api.IMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.IMetric;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Since 5/18/16.
  * The original storm.kafka.KafkaSpout has some issues like the following
  * 1) can only support one single topic
- * 2) can only be initialized at open(), can't dynamically support another topic
+ * 2) can only be initialized at open(), can't dynamically support another topic.
  */
 public class KafkaSpoutMetric implements IMetric {
     @SuppressWarnings("unused")
-    private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutMetric.class);
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMetric.class);
     private Map<String, KafkaSpoutMetricContext> metricContextMap = new ConcurrentHashMap<>();
     private Map<String, KafkaUtils.KafkaOffsetMetric> offsetMetricMap = new ConcurrentHashMap<>();
 
@@ -58,7 +53,7 @@ public class KafkaSpoutMetric implements IMetric {
         offsetMetricMap.remove(topic);
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public Object getValueAndReset() {
         HashMap spoutMetric = new HashMap();
@@ -76,16 +71,16 @@ public class KafkaSpoutMetric implements IMetric {
                 offsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
             }
             Object o = offsetMetric.getValueAndReset();
-            if(o != null) {
+            if (o != null) {
                 ((HashMap) o).forEach(
-                        (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
+                    (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
                 );
             }
 
             // construct partition metric
             for (PartitionManager pm : pms) {
                 pm.getMetricsDataMap().forEach(
-                        (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
+                    (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
                 );
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
index 1cef187..7ed9193 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
@@ -18,29 +18,29 @@
  */
 package storm.kafka;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.spout.ISpoutSpecLCM;
 import org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * NOTE!!!!! This class copy/paste some code from storm.kafka.KafkaSpout to make sure it can support one process to hold multiple
  * KafkaSpout
  *
- * this collectorWrapper provides the following capabilities:
+ * <p>this collectorWrapper provides the following capabilities:
  * 1. inject customized collector collectorWrapper, so framework can control traffic routing
  * 2. listen to topic to stream metadata change and pass that to customized collector collectorWrapper
- * 3. return current streams for this topic
+ * 3. return current streams for this topic</p>
  */
 public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM {
     private static final long serialVersionUID = 5507693757424351306L;
@@ -55,7 +55,7 @@ public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM {
 
     private SpoutOutputCollectorWrapper collectorWrapper;
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
         String topologyInstanceId = context.getStormId();
@@ -95,16 +95,16 @@ public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM {
         metricContext._spoutConfig = _spoutConfig;
         kafkaSpoutMetric.addTopic(_spoutConfig.topic, metricContext);
 
-        this.collectorWrapper = (SpoutOutputCollectorWrapper)collector;
+        this.collectorWrapper = (SpoutOutputCollectorWrapper) collector;
     }
 
     @Override
-    public void update(SpoutSpec metadata, Map<String, StreamDefinition> sds){
+    public void update(SpoutSpec metadata, Map<String, StreamDefinition> sds) {
         collectorWrapper.update(metadata, sds);
     }
 
     @Override
-    public void close(){
+    public void close() {
         super.close();
         kafkaSpoutMetric.removeTopic(_spoutConfig.topic);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
index 2da6288..965480e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
@@ -16,252 +16,264 @@
   -->
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
 <html xmlns="http://www.w3.org/1999/xhtml">
-	<head>
-		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
-		<meta name="viewport" content="width=device-width"/>
-		<style>
-			body {
-				width:100% !important;
-				min-width: 100%;
-				-webkit-text-size-adjust:100%;
-				-ms-text-size-adjust:100%;
-				margin:0;
-				padding:0;
-			}
+<head>
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+    <meta name="viewport" content="width=device-width"/>
+    <style>
+        body {
+            width: 100% !important;
+            min-width: 100%;
+            -webkit-text-size-adjust: 100%;
+            -ms-text-size-adjust: 100%;
+            margin: 0;
+            padding: 0;
+        }
 
-			table {
-				border-spacing: 0;
-				border-collapse: collapse;
-			}
+        table {
+            border-spacing: 0;
+            border-collapse: collapse;
+        }
 
-			table th,
-			table td {
-				padding: 3px 0 3px 0;
-			}
+        table th,
+        table td {
+            padding: 3px 0 3px 0;
+        }
 
-			.body {
-				width: 100%;
-			}
+        .body {
+            width: 100%;
+        }
 
-			p,a,h1,h2,h3,ul,ol,li {
-				font-family: Helvetica, Arial, sans-serif;
-				font-weight: normal;
-				margin: 0;
-				padding: 0;
-			}
-			p {
-				font-size: 14px;
-				line-height: 19px;
-			}
-			a {
-				color: #3294b1;
-			}
-			h1 {
-				font-size: 36px;
-				margin: 15px 0 5px 0;
-			}
-			h2 {
-				font-size: 32px;
-			}
-			h3 {
-				font-size: 28px;
-			}
+        p, a, h1, h2, h3, ul, ol, li {
+            font-family: Helvetica, Arial, sans-serif;
+            font-weight: normal;
+            margin: 0;
+            padding: 0;
+        }
 
-			ul,ol {
-				margin: 0 0 0 25px;
-				padding: 0;
-			}
+        p {
+            font-size: 14px;
+            line-height: 19px;
+        }
 
-			.btn {
-				background: #2ba6cb !important;
-				border: 1px solid #2284a1;
-				padding: 10px 20px 10px 20px;
-				text-align: center;
-			}
-			.btn:hover {
-				background: #2795b6 !important;
-			}
-			.btn a {
-				color: #FFFFFF;
-				text-decoration: none;
-				font-weight: bold;
-				padding: 10px 20px 10px 20px;
-			}
+        a {
+            color: #3294b1;
+        }
 
-			.tableBordered {
-				border-top: 1px solid #b9e5ff;
-			}
-			.tableBordered th {
-				background: #ECF8FF;
-			}
-			.tableBordered th p {
-				font-weight: bold;
-				color: #3294b1;
-			}
-			.tableBordered th,
-			.tableBordered td {
-				color: #333333;
-				border-bottom: 1px solid #b9e5ff;
-				text-align: center;
-				padding-bottom: 5px;
-			}
+        h1 {
+            font-size: 36px;
+            margin: 15px 0 5px 0;
+        }
 
-			.panel {
-				height: 100px;
-			}
-		</style>
-	</head>
-	<body>
-		#set ( $elem = $alertList[0] )
-		#set ( $alertUrl = $elem["alertDetailUrl"] )
-		#set ( $policyUrl = $elem["policyDetailUrl"] )
-		<table class="body">
-			<tr>
-				<td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
-					<!-- Header -->
-					<table width="580">
-						<tr>
-							<td style="padding: 0 0 0 0;" align="left" >
-								<p style="color:#FFFFFF;font-weight: bold; font-size: 22px">UMP Alerts</p>
-							</td>
-						</tr>
-					</table>
-				</td>
-			</tr>
+        h2 {
+            font-size: 32px;
+        }
 
-			<tr>
-				<td align="center" valign="top">
-					<!-- Eagle Body -->
-					<table width="580">
-						<tr>
-							<!-- Title -->
-							<td align="center">
-								<h1>$elem["streamId"] Alert Detected</h1>
-							</td>
-						</tr>
-						<tr>
-							<!-- Time -->
-							<td>
-								<table width="580">
-									<tr>
-										<td>
-											<p><b>Detected Time: $elem["alertTime"]</b></p>
-										</td>
-										#set ( $severity = $elem["severity"] )
-										#if (!$severity || ("$severity" == ""))
-											#set ( $elem["severity"] = "WARNING")
-										#end
-										<td align="right">
-											<p><b>
-												Severity:
-									            #if ($elem["severity"] == "WARNING")
-													<span>$elem["severity"]</span>												
-    											#else
-													<span style="color: #FF0000;">$elem["severity"]</span>
-    											#end
-											</b></p>
-										</td>
-									</tr>
-								</table>
-							</td>
-						</tr>
-						<tr>
-							<!-- Description -->
-							<td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
-								<p>$elem["alertMessage"]</p>
-							</td>
-						</tr>
-						<tr>
-							<!-- View Detail -->
-							<td align="center" style="padding: 10px 0 0 0;">
-								<table width="580">
-									<tr>
-										<td class="btn">
-											<a href="$alertUrl">View Alert Details on Eagle Web</a>
-										</td>
-									</tr>
-								</table>
-							</td>
-						</tr>
-						<tr>
-							<!-- Basic Information -->
-							<td style="padding: 20px 0 0 0;">
-								<p><b>Basic Information:</b></p>
-							</td>
-						</tr>
-						<tr>
-							<!-- Basic Information Content -->
-							<td>
-								<table class="tableBordered" width="580">
-									<tr>
-										<th>
-											<p>Policy Name</p>
-										</th>
-										<th>
-											<p>Data Source</p>
-										</th>
-									</tr>
-									<tr>
-										<td>
-											<p>$elem["policyId"]</p>
-										</td>
-										<td>
-											<p>$elem["streamId"]</p>
-										</td>
-									</tr>
-									<tr>
+        h3 {
+            font-size: 28px;
+        }
 
-										<th>
-											<p>Creator</p>
-										</th>
-										<th>
-											<p>Severity</p>
-										</th>
-									</tr>
-									<tr>
-										<td>
-											<p>$elem["creator"]</p>
-										</td>
-										<td>
-											<p>$elem["severity"]</p>
-										</td>
-									</tr>
-								</table>
-							</td>
-						</tr>
-						<tr>
-							<!-- View Detail -->
-							<td align="center" style="padding: 10px 0 0 0;">
-								<table width="580">
-									<tr>
-										<td class="btn">
-											<a href="$policyUrl">View Policy Details on Eagle Web</a>
-										</td>
-									</tr>
-								</table>
-							</td>
-						</tr>						
-						<tr>
-							<!-- Actions Required -->
-							<td style="padding: 20px 0 0 0;">
-								<p><b>Actions Required:</b></p>
-							</td>
-						</tr>
-						<tr>
-							<!-- Possible Root Causes Content -->
-							<td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
-								<p> $elem["streamId"] alert found, please check.</p>
-							</td>
-						</tr>
-						<tr>
-							<!-- Copyright -->
-							<td align="center">
-								<p><a href="<Eagle-Host>/alerts/alertlist.html">UMP Alert Engine</a></p>
-							</td>
-						</tr>
-					</table>
-				</td>
-			</tr>
-		</table>
-	</body>
+        ul, ol {
+            margin: 0 0 0 25px;
+            padding: 0;
+        }
+
+        .btn {
+            background: #2ba6cb !important;
+            border: 1px solid #2284a1;
+            padding: 10px 20px 10px 20px;
+            text-align: center;
+        }
+
+        .btn:hover {
+            background: #2795b6 !important;
+        }
+
+        .btn a {
+            color: #FFFFFF;
+            text-decoration: none;
+            font-weight: bold;
+            padding: 10px 20px 10px 20px;
+        }
+
+        .tableBordered {
+            border-top: 1px solid #b9e5ff;
+        }
+
+        .tableBordered th {
+            background: #ECF8FF;
+        }
+
+        .tableBordered th p {
+            font-weight: bold;
+            color: #3294b1;
+        }
+
+        .tableBordered th,
+        .tableBordered td {
+            color: #333333;
+            border-bottom: 1px solid #b9e5ff;
+            text-align: center;
+            padding-bottom: 5px;
+        }
+
+        .panel {
+            height: 100px;
+        }
+    </style>
+</head>
+<body>
+    #set ( $elem = $alertList[0] )
+    #set ( $alertUrl = $elem["alertDetailUrl"] )
+    #set ( $policyUrl = $elem["policyDetailUrl"] )
+<table class="body">
+    <tr>
+        <td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+            <!-- Header -->
+            <table width="580">
+                <tr>
+                    <td style="padding: 0 0 0 0;" align="left">
+                        <p style="color:#FFFFFF;font-weight: bold; font-size: 22px">UMP Alerts</p>
+                    </td>
+                </tr>
+            </table>
+        </td>
+    </tr>
+
+    <tr>
+        <td align="center" valign="top">
+            <!-- Eagle Body -->
+            <table width="580">
+                <tr>
+                    <!-- Title -->
+                    <td align="center">
+                        <h1>$elem["streamId"] Alert Detected</h1>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Time -->
+                    <td>
+                        <table width="580">
+                            <tr>
+                                <td>
+                                    <p><b>Detected Time: $elem["alertTime"]</b></p>
+                                </td>
+                                #set ( $severity = $elem["severity"] )
+                                #if (!$severity || ("$severity" == ""))
+                                    #set ( $elem["severity"] = "WARNING")
+                                #end
+                                <td align="right">
+                                    <p><b>
+                                        Severity:
+                                        #if ($elem["severity"] == "WARNING")
+                                            <span>$elem["severity"]</span>
+                                        #else
+                                            <span style="color: #FF0000;">$elem["severity"]</span>
+                                        #end
+                                    </b></p>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Description -->
+                    <td valign="top"
+                        style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+                        <p>$elem["alertMessage"]</p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- View Detail -->
+                    <td align="center" style="padding: 10px 0 0 0;">
+                        <table width="580">
+                            <tr>
+                                <td class="btn">
+                                    <a href="$alertUrl">View Alert Details on Eagle Web</a>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Basic Information -->
+                    <td style="padding: 20px 0 0 0;">
+                        <p><b>Basic Information:</b></p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Basic Information Content -->
+                    <td>
+                        <table class="tableBordered" width="580">
+                            <tr>
+                                <th>
+                                    <p>Policy Name</p>
+                                </th>
+                                <th>
+                                    <p>Data Source</p>
+                                </th>
+                            </tr>
+                            <tr>
+                                <td>
+                                    <p>$elem["policyId"]</p>
+                                </td>
+                                <td>
+                                    <p>$elem["streamId"]</p>
+                                </td>
+                            </tr>
+                            <tr>
+
+                                <th>
+                                    <p>Creator</p>
+                                </th>
+                                <th>
+                                    <p>Severity</p>
+                                </th>
+                            </tr>
+                            <tr>
+                                <td>
+                                    <p>$elem["creator"]</p>
+                                </td>
+                                <td>
+                                    <p>$elem["severity"]</p>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- View Detail -->
+                    <td align="center" style="padding: 10px 0 0 0;">
+                        <table width="580">
+                            <tr>
+                                <td class="btn">
+                                    <a href="$policyUrl">View Policy Details on Eagle Web</a>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Actions Required -->
+                    <td style="padding: 20px 0 0 0;">
+                        <p><b>Actions Required:</b></p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Possible Root Causes Content -->
+                    <td class="panel" valign="top"
+                        style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+                        <p> $elem["streamId"] alert found, please check.</p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Copyright -->
+                    <td align="center">
+                        <p><a href="<Eagle-Host>/alerts/alertlist.html">UMP Alert Engine</a></p>
+                    </td>
+                </tr>
+            </table>
+        </td>
+    </tr>
+</table>
+</body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index dac2f07..754c00b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -13,17 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_1",
-    "numOfTotalWorkers" : 2,
-    "numOfSpoutTasks" : 1,
-    "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
-    "numOfPublishTasks" : 1,
+  "topology": {
+    "name": "alertUnitTopology_1",
+    "numOfTotalWorkers": 2,
+    "numOfSpoutTasks": 1,
+    "numOfRouterBolts": 4,
+    "numOfAlertBolts": 10,
+    "numOfPublishTasks": 1,
     "messageTimeoutSecs": 3600,
-    "localMode" : "true"
+    "localMode": "true"
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "server.eagle.apache.org:2181",
     "kafkaBrokerZkBasePath": "/kafka",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -31,16 +31,16 @@
     "stormKafkaTransactionZkPath": "/consumers",
     "stormKafkaEagleConsumer": "eagle_consumer"
   },
-  "zkConfig" : {
-    "zkQuorum" : "server.eagle.apache.org:2181",
-    "zkRoot" : "/alert"
+  "zkConfig": {
+    "zkQuorum": "server.eagle.apache.org:2181",
+    "zkRoot": "/alert"
   },
   "metadataService": {
-    "context" : "/rest",
-    "host" : "localhost",
-    "port" : 9090
+    "context": "/rest",
+    "host": "localhost",
+    "port": 9090
   },
-  "metric":{
+  "metric": {
     "sink": {
       // "kafka": {
       //  "topic": "alert_metric"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
index 5e3d3b1..47d476a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
@@ -12,14 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 log4j.rootLogger=INFO, stdout
-
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
 ##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
 log4j.logger.org.apache.eagle.alert.metric=ERROR
 log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
index ca5bfdf..24abae6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
@@ -16,8 +16,8 @@
  */
 package org.apache.eagle.alert.engine.absence;
 
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
 import org.junit.Test;
 
@@ -33,11 +33,11 @@ import java.util.TimeZone;
  */
 public class TestAbsenceDriver {
     @Test
-    public void testAbsence() throws Exception{
+    public void testAbsence() throws Exception {
         // from 2PM to 3PM each day
         AbsenceDailyRule rule = new AbsenceDailyRule();
-        rule.startOffset = 14*3600*1000;
-        rule.endOffset = 15*3600*1000;
+        rule.startOffset = 14 * 3600 * 1000;
+        rule.endOffset = 15 * 3600 * 1000;
         AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
         List<Object> expectAttrs = Arrays.asList("host1");
         AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
@@ -52,23 +52,23 @@ public class TestAbsenceDriver {
         // first event
         driver.process(Arrays.asList("host2"), baseOccurTime);
         // event after 1 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600 * 1000);
         // event after 2 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2 * 3600 * 1000);
         // event after 3 hour, enter this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 1000);
         // event after 3.5 hour, still in this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 1000 + 1800 * 1000);
         // event after 4 hour, exit this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4 * 3600 * 1000);
     }
 
     @Test
-    public void testOccurrence() throws Exception{
+    public void testOccurrence() throws Exception {
         // from 2PM to 3PM each day
         AbsenceDailyRule rule = new AbsenceDailyRule();
-        rule.startOffset = 14*3600*1000;
-        rule.endOffset = 15*3600*1000;
+        rule.startOffset = 14 * 3600 * 1000;
+        rule.endOffset = 15 * 3600 * 1000;
         AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
         List<Object> expectAttrs = Arrays.asList("host1");
         AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
@@ -83,14 +83,14 @@ public class TestAbsenceDriver {
         // first event
         driver.process(Arrays.asList("host2"), baseOccurTime);
         // event after 1 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600 * 1000);
         // event after 2 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2 * 3600 * 1000);
         // event after 3 hour, enter this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 1000);
         // event after 3.5 hour, still in this window
-        driver.process(Arrays.asList("host1"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        driver.process(Arrays.asList("host1"), baseOccurTime + 3 * 3600 * 1000 + 1800 * 1000);
         // event after 4 hour, exit this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4 * 3600 * 1000);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
index 7f325c4..7bce34d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
@@ -42,11 +42,11 @@ public class TestAbsencePolicyHandler {
     private static final String outputStream = "testOutputStream";
 
     @Test
-    public void test() throws Exception{
+    public void test() throws Exception {
         test(buildPolicyDef_provided());
     }
 
-    public void test(PolicyDefinition pd) throws Exception{
+    public void test(PolicyDefinition pd) throws Exception {
         Map<String, StreamDefinition> sds = new HashMap<>();
         StreamDefinition sd = buildStreamDef();
         sds.put("testInputStream", sd);
@@ -62,14 +62,14 @@ public class TestAbsencePolicyHandler {
     private static class TestCollector implements Collector {
         @Override
         public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent)o;
+            AlertStreamEvent e = (AlertStreamEvent) o;
             Object[] data = e.getData();
             Assert.assertEquals("host2", data[1]);
             LOG.info(e.toString());
         }
     }
 
-    private PolicyDefinition buildPolicyDef_provided(){
+    private PolicyDefinition buildPolicyDef_provided() {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
@@ -81,7 +81,7 @@ public class TestAbsencePolicyHandler {
         return pd;
     }
 
-    private StreamDefinition buildStreamDef(){
+    private StreamDefinition buildStreamDef() {
         StreamDefinition sd = new StreamDefinition();
         StreamColumn tsColumn = new StreamColumn();
         tsColumn.setName("timestamp");
@@ -101,9 +101,9 @@ public class TestAbsencePolicyHandler {
         return sd;
     }
 
-    private StreamEvent buildStreamEvt(long ts, String jobID, String status){
+    private StreamEvent buildStreamEvt(long ts, String jobID, String status) {
         StreamEvent e = new StreamEvent();
-        e.setData(new Object[]{ts, jobID, status});
+        e.setData(new Object[] {ts, jobID, status});
         e.setStreamId(inputStream);
         e.setTimestamp(ts);
         return e;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
index e2345c9..4eda2ed 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
@@ -32,11 +32,11 @@ import java.util.TimeZone;
  */
 public class TestAbsenceWindowGenerator {
     @Test
-    public void testWindowInToday() throws Exception{
+    public void testWindowInToday() throws Exception {
         AbsenceDailyRule rule = new AbsenceDailyRule();
         // from 2PM to 3PM each day
-        rule.startOffset = 14*3600*1000;
-        rule.endOffset = 15*3600*1000;
+        rule.startOffset = 14 * 3600 * 1000;
+        rule.endOffset = 15 * 3600 * 1000;
         AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
 
         // get current time
@@ -51,15 +51,15 @@ public class TestAbsenceWindowGenerator {
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
         d = df.parse(currDate);
         AbsenceWindow window = generator.nextWindow(d.getTime());
-        Assert.assertEquals(startTimeOfDay+rule.startOffset, window.startTime);
+        Assert.assertEquals(startTimeOfDay + rule.startOffset, window.startTime);
     }
 
     @Test
-    public void testWindowInTomorrow() throws Exception{
+    public void testWindowInTomorrow() throws Exception {
         AbsenceDailyRule rule = new AbsenceDailyRule();
         // from 2PM to 3PM each day
-        rule.startOffset = 14*3600*1000;
-        rule.endOffset = 15*3600*1000;
+        rule.startOffset = 14 * 3600 * 1000;
+        rule.endOffset = 15 * 3600 * 1000;
         AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
 
         // get current time
@@ -75,6 +75,6 @@ public class TestAbsenceWindowGenerator {
         d = df.parse(currDate);
         AbsenceWindow window = generator.nextWindow(d.getTime());
         // this needs adjustment for one day
-        Assert.assertEquals(startTimeOfDay+rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
+        Assert.assertEquals(startTimeOfDay + rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
index a47c7a4..32a614b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
@@ -29,7 +29,7 @@ import java.util.List;
  */
 public class TestAbsenceWindowProcessor {
     @Test
-    public void testDataMissing(){
+    public void testDataMissing() {
         List<Object> expectedHosts = Arrays.asList("host1");
         AbsenceWindow window = new AbsenceWindow();
         window.startTime = 100L;
@@ -48,7 +48,7 @@ public class TestAbsenceWindowProcessor {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testDataExists(){
+    public void testDataExists() {
         List<Object> expectedHosts = Arrays.asList("host1");
         AbsenceWindow window = new AbsenceWindow();
         window.startTime = 100L;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
index a85a5cd..3f5fc67 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
@@ -16,24 +16,22 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.io.Closeable;
-import java.io.IOException;
-
-import javax.ws.rs.core.MediaType;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.Closeable;
+import java.io.IOException;
 
 /**
  * @since May 9, 2016
- *
  */
 public class CoordinatorClient implements Closeable {
 
@@ -53,7 +51,7 @@ public class CoordinatorClient implements Closeable {
 
     public CoordinatorClient(Config config) {
         this(config.getString(EAGLE_COORDINATOR_SERVICE_HOST), config.getInt(EAGLE_COORDINATOR_SERVICE_PORT), config
-                .getString(EAGLE_COORDINATOR_SERVICE_CONTEXT));
+            .getString(EAGLE_COORDINATOR_SERVICE_CONTEXT));
         basePath = buildBasePath();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index db8e0a2..01e50a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -44,9 +44,8 @@ import java.util.concurrent.ThreadFactory;
 
 /**
  * Case of simple
- * 
- * @since May 8, 2016
  *
+ * @since May 8, 2016
  */
 public class Integration1 {
     private static final String SIMPLE_CONFIG = "/simple/application-integration.conf";
@@ -61,7 +60,7 @@ public class Integration1 {
 
     private String[] args;
     private ExecutorService executors = Executors.newFixedThreadPool(5, new ThreadFactory() {
-        
+
         @Override
         public Thread newThread(Runnable r) {
             Thread t = new Thread(r);
@@ -98,7 +97,7 @@ public class Integration1 {
      * Create topic
      * liasu@xxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic perfmon_metrics
      * <p>
-     * 
+     *
      * @throws InterruptedException
      */
     @Test
@@ -112,12 +111,12 @@ public class Integration1 {
         System.out.println("loading metadatas done!");
 
         if (args == null) {
-            args = new String[] { "-c", "simple/application-integration.conf" };
+            args = new String[] {"-c", "simple/application-integration.conf"};
         }
 
         executors.submit(() -> SampleClient1.main(args));
 
-        executors.submit(() -> { 
+        executors.submit(() -> {
             try {
                 UnitTopologyMain.main(args);
             } catch (Exception e) {
@@ -190,7 +189,7 @@ public class Integration1 {
      * ],"pubBoltId":"xxx-pubBolt","spoutParallelism":1,"groupParallelism":1,
      * "alertParallelism":1}
      * <p>
-     * 
+     *
      * @throws Exception
      */
     @Ignore
@@ -199,7 +198,7 @@ public class Integration1 {
         {
             JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class));
             List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/simple/topologies.json"),
-                    type);
+                type);
             Topology t = (Topology) l.get(0);
 
             Assert.assertEquals(4, t.getGroupNodeIds().size());
@@ -213,7 +212,7 @@ public class Integration1 {
             Publishment p = l.get(0);
             Assert.assertEquals("KAFKA", p.getType());
         }
-        
+
         checkAll("/simple/");
         checkAll("/correlation/");
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
index a11cc66..5c2c404 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -16,9 +16,9 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -29,26 +29,23 @@ import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.core.stream.output.StreamCallback;
 import org.wso2.siddhi.core.util.EventPrinter;
 
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * @since May 10, 2016
- *
  */
 public class Integration2 {
 
     private ExecutorService executors = Executors.newFixedThreadPool(5);
-    
+
     /**
      * <pre>
      * Create topic
      * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic eslogs
      * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bootfailures
      * </pre>
-     * 
+     *
      * @param args
      */
     public static void main(String[] args) throws Exception {
@@ -72,7 +69,8 @@ public class Integration2 {
                 UnitTopologyMain.main(args);
             } catch (Exception e) {
                 e.printStackTrace();
-            }});
+            }
+        });
 
         executors.submit(() -> SampleClient2.main(args));
 
@@ -83,7 +81,8 @@ public class Integration2 {
         }
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void test3() throws Exception {
         SiddhiManager sm = new SiddhiManager();
         String s1 = " define stream esStream(instanceUuid string, timestamp long, logLevel string, message string, reqId string, host string, component string); ";
@@ -103,14 +102,14 @@ public class Integration2 {
         InputHandler input2 = epr.getInputHandler("ifStream");
 
         epr.start();
-        
+
         long base = 1462880695837l;
-        
+
         while (true) {
             sendEvent(input1, input2, base);
-            
+
             base = base + 3000;
-            
+
             Utils.sleep(3000);
         }
 
@@ -121,25 +120,25 @@ public class Integration2 {
             Event e = new Event();
             e.setTimestamp(base);
             e.setData(new Object[] {
-                    "instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
-                    base,
-                    "ERROR",
-                    "NullPointException",
-                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
-                    "nova.host",
-                    "NOVA"
+                "instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
+                base,
+                "ERROR",
+                "NullPointException",
+                "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                "nova.host",
+                "NOVA"
             });
             input1.send(e);
         }
-        
+
         {
             Event e = new Event();
             e.setTimestamp(base);
             e.setData(new Object[] {"instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
-                    base,
-                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
-                    "boot failure for when try start the given vm!",
-                    "boot-vm-data-center.corp.com"});
+                base,
+                "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                "boot failure for when try start the given vm!",
+                "boot-vm-data-center.corp.com"});
             input2.send(e);
         }
     }



Mime
View raw message