eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [10/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:14:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
index da7fb7e..a9efc97 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
@@ -1,12 +1,4 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.io.Serializable;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,9 +14,20 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+import java.io.Serializable;
+
 public interface StreamRouter extends StreamSortSpecListener, Serializable {
     void prepare(StreamContext context, PartitionedEventCollector outputCollector);
+
     void nextEvent(PartitionedEvent event);
+
     String getName();
+
     void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
index cd4bfd3..0016fc0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
@@ -19,14 +19,14 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
+import java.util.Map;
+
 /**
- * Since 5/1/16.
- * Listen to change on StreamRouterBoltSpec
+ * Listen to change on StreamRouterBoltSpec.
+ * @since 5/1/16.
  */
 public interface StreamRouterBoltSpecListener {
     void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
index 2229099..613ab7f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
@@ -22,21 +22,13 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
 
 public interface StreamSortHandler extends StreamTimeClockListener {
-    /**
-     *
-     * @param streamId
-     * @param streamSortSpecSpec
-     * @param outputCollector
-     */
+
     void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
 
     /**
-     * @param event StreamEvent
+     * @param event StreamEvent.
      */
     void nextEvent(PartitionedEvent event);
 
-    /**
-     *
-     */
     void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
index affa979..087a46f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
@@ -1,11 +1,4 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,6 +14,13 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+
+import java.util.Map;
+
 public interface StreamSortSpecListener {
     void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
                                 Map<StreamPartition, StreamSortSpec> removed,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
index afb9a6f..9337bde 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
@@ -16,17 +16,17 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.router.StreamRoute;
 import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
     private final List<String> outputComponentIds;
@@ -41,7 +41,7 @@ public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
 
     @Override
     public List<StreamRoute> partition(StreamEvent event) {
-        switch (this.streamPartition.getType()){
+        switch (this.streamPartition.getType()) {
             case GLOBAL:
                 return routeToAll(event);
             case GROUPBY:
@@ -52,19 +52,19 @@ public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
     }
 
     protected List<StreamRoute> routeByGroupByKey(StreamEvent event) {
-        int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition,this.streamPartition.getColumns())).build();
+        int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition, this.streamPartition.getColumns())).build();
         String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size());
         return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY));
     }
 
     protected List<StreamRoute> routeByShuffle(StreamEvent event) {
         long random = System.currentTimeMillis();
-        int hash = Math.abs((int)random);
-        return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()),-1,StreamPartition.Type.SHUFFLE));
+        int hash = Math.abs((int) random);
+        return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()), -1, StreamPartition.Type.SHUFFLE));
     }
 
     protected List<StreamRoute> routeToAll(StreamEvent event) {
-        if(_globalRoutingKeys!=null) {
+        if (_globalRoutingKeys != null) {
             _globalRoutingKeys = new ArrayList<>();
             for (String targetId : outputComponentIds) {
                 _globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
index d0bf012..5c10675 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
@@ -16,55 +16,50 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.trident.partition.GlobalGrouping;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.WorkerTopologyContext;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.partition.GlobalGrouping;
+
+import java.util.*;
 
 public class RoutePhysicalGrouping implements CustomStreamGrouping {
     private static final long serialVersionUID = -511915083994148362L;
-    private final static Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
+    private static final Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
     private List<Integer> outdegreeTasks;
     private ShuffleGrouping shuffleGroupingDelegate;
     private GlobalGrouping globalGroupingDelegate;
-    private Map<String,Integer> connectedTargetIds;
+    private Map<String, Integer> connectedTargetIds;
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         this.outdegreeTasks = new ArrayList<>(targetTasks);
         shuffleGroupingDelegate = new ShuffleGrouping();
-        shuffleGroupingDelegate.prepare(context,stream,targetTasks);
+        shuffleGroupingDelegate.prepare(context, stream, targetTasks);
         globalGroupingDelegate = new GlobalGrouping();
-        globalGroupingDelegate.prepare(context,stream,targetTasks);
+        globalGroupingDelegate.prepare(context, stream, targetTasks);
         connectedTargetIds = new HashMap<>();
-        for(Integer targetId:targetTasks){
+        for (Integer targetId : targetTasks) {
             String targetComponentId = context.getComponentId(targetId);
-            connectedTargetIds.put(targetComponentId,targetId);
+            connectedTargetIds.put(targetComponentId, targetId);
         }
-        LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(),","));
+        LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ","));
     }
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         Object routingKeyObj = values.get(0);
-        if(routingKeyObj!=null){
+        if (routingKeyObj != null) {
             PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj;
             if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) {
-                return globalGroupingDelegate.chooseTasks(taskId,values);
+                return globalGroupingDelegate.chooseTasks(taskId, values);
             } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) {
-                return Collections.singletonList(outdegreeTasks.get((int)(partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
+                return Collections.singletonList(outdegreeTasks.get((int) (partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
             }
             // Shuffle by defaults
             return shuffleGroupingDelegate.chooseTasks(taskId, values);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
index c73854a..752c742 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,23 +17,18 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.WorkerTopologyContext;
 
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * NOTE: This is copy from storm 1.0.0 code. DON'T modify it.
- * 
- * @since May 4, 2016
  *
+ * @since May 4, 2016
  */
 public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
     private static final long serialVersionUID = 5035497345182141765L;
@@ -45,7 +40,7 @@ public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         random = new Random();
         choices = new ArrayList<List<Integer>>(targetTasks.size());
-        for (Integer i: targetTasks) {
+        for (Integer i : targetTasks) {
             choices.add(Arrays.asList(i));
         }
         Collections.shuffle(choices, random);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 28d2d22..ae678ea 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -18,13 +18,6 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
@@ -40,36 +33,35 @@ import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
 import org.apache.eagle.alert.engine.router.StreamRouteSpecListener;
 import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import org.apache.eagle.alert.utils.StreamIdConversion;
+import backtype.storm.task.OutputCollector;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
-
-import com.google.common.collect.Lists;
+import java.util.*;
 
 /**
  * After sorting, one stream's message will be routed based on its StreamPartition
- * One stream may have multiple StreamPartitions based on how this stream is grouped by
- *
+ * One stream may have multiple StreamPartitions based on how this stream is grouped by.
  * TODO: Add metric statistics
  */
 public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
     private final OutputCollector outputCollector;
     private final Object outputLock = new Object();
-//    private final List<String> outputStreamIds;
+    //    private final List<String> outputStreamIds;
     private final StreamContext streamContext;
     private final PartitionedEventSerializer serializer;
-    private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap;
+    private volatile Map<StreamPartition, StreamRouterSpec> routeSpecMap;
     private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
     private final String sourceId;
 
-    public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer){
+    public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer) {
         this.sourceId = sourceId;
         this.outputCollector = outputCollector;
         this.routeSpecMap = new HashMap<>();
         this.routePartitionerMap = new HashMap<>();
-//        this.outputStreamIds = outputStreamIds;
+        // this.outputStreamIds = outputStreamIds;
         this.streamContext = streamContext;
         this.serializer = serializer;
     }
@@ -114,7 +106,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
                             }
-                            if(this.serializer == null) {
+                            if (this.serializer == null) {
                                 outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent));
                             } else {
                                 outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
@@ -129,8 +121,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                 }
                 outputCollector.ack(event.getAnchor());
             }
-        } catch (Exception ex){
-            LOG.error(ex.getMessage(),ex);
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
             synchronized (outputLock) {
                 this.streamContext.counter().scope("fail_count").incr();
                 this.outputCollector.fail(event.getAnchor());
@@ -142,33 +134,33 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
     public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
                                          Collection<StreamRouterSpec> removed,
                                          Collection<StreamRouterSpec> modified,
-                                         Map<String, StreamDefinition> sds){
-        Map<StreamPartition,StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
+                                         Map<String, StreamDefinition> sds) {
+        Map<StreamPartition, StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
         Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
 
         // added StreamRouterSpec i.e. there is a new StreamPartition
-        for(StreamRouterSpec spec : added){
-            if(copyRouteSpecMap.containsKey(spec.getPartition())){
+        for (StreamRouterSpec spec : added) {
+            if (copyRouteSpecMap.containsKey(spec.getPartition())) {
                 LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
-            }else{
+            } else {
                 inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
             }
         }
 
         // removed StreamRouterSpec i.e. there is a deleted StreamPartition
-        for(StreamRouterSpec spec : removed){
-            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
+        for (StreamRouterSpec spec : removed) {
+            if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
                 LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
-            }else{
+            } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
             }
         }
 
         // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
-        for(StreamRouterSpec spec : modified){
-            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
+        for (StreamRouterSpec spec : modified) {
+            if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
                 LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
-            }else{
+            } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
                 inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
             }
@@ -179,34 +171,34 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         routePartitionerMap = copyRoutePartitionerMap;
     }
 
-    private void inplaceRemove(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
+    private void inplaceRemove(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
                                Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                               StreamRouterSpec toBeRemoved){
+                               StreamRouterSpec toBeRemoved) {
         routeSpecMap.remove(toBeRemoved.getPartition());
         routePartitionerMap.remove(toBeRemoved.getPartition());
     }
 
-    private void inplaceAdd(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
+    private void inplaceAdd(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
                             Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                            StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds){
+                            StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) {
         routeSpecMap.put(toBeAdded.getPartition(), toBeAdded);
         try {
             List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds);
             routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
-        }catch(Exception e){
-            LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(),e);
+        } catch (Exception e) {
+            LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e);
             routeSpecMap.remove(toBeAdded.getPartition());
             routePartitionerMap.remove(toBeAdded.getPartition());
         }
     }
 
-    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception{
+    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception {
         List<StreamRoutePartitioner> routePartitioners = new ArrayList<>();
         for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
             routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
-                    Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
-                    sds.get(streamRouterSpec.getPartition().getStreamId()),
-                    streamRouterSpec.getPartition()));
+                Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
+                sds.get(streamRouterSpec.getPartition().getStreamId()),
+                streamRouterSpec.getPartition()));
         }
         return routePartitioners;
     }
@@ -217,8 +209,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
             this.streamContext.counter().scope("drop_count").incr();
             if (event.getAnchor() != null) {
                 this.outputCollector.ack(event.getAnchor());
-            }else{
-                throw new IllegalStateException(event.toString()+" was not acked as anchor is null");
+            } else {
+                throw new IllegalStateException(event.toString() + " was not acked as anchor is null");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
index 5008dbf..7b2a1de 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
@@ -33,23 +29,27 @@ import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 public class StreamRouterImpl implements StreamRouter {
     private static final long serialVersionUID = -4640125063690900014L;
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
     private final String name;
-    private volatile Map<StreamPartition,StreamSortHandler> streamSortHandlers;
+    private volatile Map<StreamPartition, StreamSortHandler> streamSortHandlers;
     private PartitionedEventCollector outputCollector;
     private StreamTimeClockManager streamTimeClockManager;
     private StreamContext context;
 
     /**
-     * @param name This name should be formed by topologyId + router id, which is built by topology builder
+     * @param name This name should be formed by topologyId + router id, which is built by topology builder.
      */
-    public StreamRouterImpl(String name){
+    public StreamRouterImpl(String name) {
         this.name = name;
     }
 
-    public String getName(){
+    public String getName() {
         return this.name;
     }
 
@@ -67,32 +67,34 @@ public class StreamRouterImpl implements StreamRouter {
     }
 
     /**
-     * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer
+     * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer.
      *
      * @param event StreamEvent
      */
     public void nextEvent(PartitionedEvent event) {
         this.context.counter().scope("receive_count").incr();
-        if(!dispatchToSortHandler(event)) {
+        if (!dispatchToSortHandler(event)) {
             this.context.counter().scope("direct_count").incr();
             // Pass through directly if no need to sort
             outputCollector.emit(event);
         }
         this.context.counter().scope("sort_count").incr();
         // Update stream clock time if moving forward and trigger all tick listeners
-        streamTimeClockManager.onTimeUpdate(event.getStreamId(),event.getTimestamp());
+        streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp());
     }
 
     /**
-     * @param event input event
-     * @return whether sorted
+     * @param event input event.
+     * @return whether sorted.
      */
-    private boolean dispatchToSortHandler(PartitionedEvent event){
-        if(event.getTimestamp() <= 0) return false;
+    private boolean dispatchToSortHandler(PartitionedEvent event) {
+        if (event.getTimestamp() <= 0) {
+            return false;
+        }
 
         StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition());
-        if(sortHandler == null){
-            if(event.isSortRequired()) {
+        if (sortHandler == null) {
+            if (event.isSortRequired()) {
                 LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
                 this.context.counter().scope("miss_sort_count").incr();
             }
@@ -105,8 +107,8 @@ public class StreamRouterImpl implements StreamRouter {
 
     @Override
     public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
-            Map<StreamPartition, StreamSortSpec> removed,
-            Map<StreamPartition, StreamSortSpec> changed) {
+                                       Map<StreamPartition, StreamSortSpec> removed,
+                                       Map<StreamPartition, StreamSortSpec> changed) {
         synchronized (streamTimeClockManager) {
             Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers);
             // add new StreamSortSpec
@@ -117,7 +119,7 @@ public class StreamRouterImpl implements StreamRouter {
                         LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec);
                     } else {
                         StreamSortHandler handler = new StreamSortWindowHandlerImpl();
-                        handler.prepare(tmp.getStreamId(),spec.getValue(), this.outputCollector);
+                        handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
                         copy.put(tmp, handler);
                         streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index 5cb38fc..c6f6906 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -16,13 +16,6 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -32,6 +25,13 @@ import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@SuppressWarnings({"rawtypes", "serial"})
+@SuppressWarnings( {"rawtypes", "serial"})
 public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
     private IMetadataChangeNotifyService changeNotifyService;
@@ -56,7 +56,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
 
     private String boltId;
     protected PartitionedEventSerializer serializer;
-    protected volatile Map<String, StreamDefinition> sdf  = new HashMap<String, StreamDefinition>();
+    protected volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>();
     protected volatile String specVersion = "Not Initialized";
     protected volatile boolean specVersionOutofdate = false;
     protected StreamContext streamContext;
@@ -67,11 +67,11 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
         this.config = config;
     }
 
-    public void declareOutputStreams(List<String> outputStreamIds){
+    public void declareOutputStreams(List<String> outputStreamIds) {
         this.outputStreamIds = outputStreamIds;
     }
 
-    protected List<String> getOutputStreamIds(){
+    protected List<String> getOutputStreamIds() {
         return this.outputStreamIds;
     }
 
@@ -81,15 +81,15 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
         this.stormConf = stormConf;
         this.collector = collector;
         this.serializer = Serializers.newPartitionedEventSerializer(this);
-        internalPrepare(collector,this.changeNotifyService,this.config,context);
+        internalPrepare(collector, this.changeNotifyService, this.config, context);
     }
 
 
     protected PartitionedEvent deserialize(Object object) throws IOException {
         // byte[] in higher priority
-        if(object instanceof byte[]) {
+        if (object instanceof byte[]) {
             return serializer.deserialize((byte[]) object);
-        } else if (object instanceof PartitionedEvent){
+        } else if (object instanceof PartitionedEvent) {
             return (PartitionedEvent) object;
         } else {
             throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
@@ -97,18 +97,19 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
     }
 
     /**
-     * subclass should implement more initialization for example
+     * subclass should implement more initialization for example.
      * 1) register metadata change
      * 2) init stream context
+     *
      * @param collector
      * @param metadataManager
      * @param config
      * @param context
      */
     public abstract void internalPrepare(
-            OutputCollector collector,
-            IMetadataChangeNotifyService metadataManager,
-            Config config, TopologyContext context);
+        OutputCollector collector,
+        IMetadataChangeNotifyService metadataManager,
+        Config config, TopologyContext context);
 
     @Override
     public void cleanup() {
@@ -117,10 +118,10 @@ public abstract class AbstractStreamBolt extends BaseRichBolt implements Seriali
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if(this.outputStreamIds!=null){
+        if (this.outputStreamIds != null) {
             LOG.info("declare streams: {} ", outputStreamIds);
-            for(String streamId:this.outputStreamIds){
-                declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
+            for (String streamId : this.outputStreamIds) {
+                declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
             }
         } else {
             declarer.declare(new Fields(AlertConstants.FIELD_0));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index fecb2f1..f437d43 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -16,15 +16,6 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.engine.AlertStreamCollector;
@@ -45,13 +36,23 @@ import org.apache.eagle.alert.metric.source.MetricSource;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.apache.eagle.alert.utils.AlertConstants;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Since 5/1/16.
@@ -59,8 +60,8 @@ import java.util.concurrent.*;
  * MonitoredStream refers to tuple of {dataSource, streamId, groupby}
  * The container is also called {@link WorkSlot}
  */
-public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener,SerializationMetadataProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
+public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener, SerializationMetadataProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
     private static final long serialVersionUID = -4132297691448945672L;
     private PolicyGroupEvaluator policyGroupEvaluator;
     private AlertStreamCollector alertOutputCollector;
@@ -71,7 +72,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
 
     private AlertBoltSpec spec;
 
-    public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
+    public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
         super(boltId, changeNotifyService, config);
         this.boltId = boltId;
         this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1"); // use bolt id as evaluatorId.
@@ -81,11 +82,11 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     private class MetaConflictMetricSource implements MetricSource {
         private MetricRegistry registry = new MetricRegistry();
 
-        public MetaConflictMetricSource(){
+        public MetaConflictMetricSource() {
             registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict happening!");
         }
 
-        public MetaConflictMetricSource(String message){
+        public MetaConflictMetricSource(String message) {
             registry.register("meta.conflict", (Gauge<String>) () -> message);
         }
 
@@ -105,26 +106,25 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         this.streamContext.counter().scope("execute_count").incr();
         try {
             PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
-            String stream_event_version = pe.getEvent().getMetaVersion();
-            if (stream_event_version != null && !stream_event_version.equals(specVersion)) {
-                if (stream_event_version == null){
+            String streamEventVersion = pe.getEvent().getMetaVersion();
+            if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) {
+                if (streamEventVersion == null) {
                     // if stream event version is null, need to initialize it
                     pe.getEvent().setMetaVersion(specVersion);
-                }
-                // check if specVersion is older than stream_event_version
-                else if (specVersion != null && stream_event_version != null &&
-                        specVersion.contains("spec_version_") && stream_event_version.contains("spec_version_")){
-//                    Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
-//                    Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
-                    long timestamp_of_specVersion = Long.valueOf(specVersion.substring(13));
-                    long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.substring(13));
-                    specVersionOutofdate = timestamp_of_specVersion < timestamp_of_streamEventVersion;
-                    if (!specVersionOutofdate){
+                } else if (specVersion != null && streamEventVersion != null
+                    && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
+                    // check if specVersion is older than stream_event_version
+                    // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
+                    // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+                    long timestampOfSpecVersion = Long.valueOf(specVersion.substring(13));
+                    long timestampOfStreamEventVersion = Long.valueOf(streamEventVersion.substring(13));
+                    specVersionOutofdate = timestampOfSpecVersion < timestampOfStreamEventVersion;
+                    if (!specVersionOutofdate) {
                         pe.getEvent().setMetaVersion(specVersion);
                     }
                 }
 
-                String message = String.format("Spec Version [%s] of AlertBolt is %s Stream Event Version [%s]!", specVersion, specVersionOutofdate ? "older than":"newer than", stream_event_version);
+                String message = String.format("Spec Version [%s] of AlertBolt is %s Stream Event Version [%s]!", specVersion, specVersionOutofdate ? "older than" : "newer than", streamEventVersion);
                 LOG.warn(message);
 
                 // send out metrics for meta conflict
@@ -137,15 +137,15 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                 ExecutorService executors = SingletonExecutor.getExecutorService();
                 executors.submit(() -> {
                     // if spec version is out-of-date, need to refresh it
-                    if (specVersionOutofdate){
-                        try{
+                    if (specVersionOutofdate) {
+                        try {
                             IMetadataServiceClient client = new MetadataServiceClientImpl(this.getConfig());
                             String topologyId = spec.getTopologyName();
                             AlertBoltSpec latestSpec = client.getVersionedSpec().getAlertSpecs().get(topologyId);
-                            if (latestSpec != null){
+                            if (latestSpec != null) {
                                 spec = latestSpec;
                             }
-                        } catch (Exception e){
+                        } catch (Exception e) {
                             LOG.error(e.toString());
                         }
 
@@ -174,8 +174,8 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) {
         // instantiate output lock object
         outputLock = new Object();
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context);
-        alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext);
+        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context);
+        alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock, streamContext);
         policyGroupEvaluator.init(streamContext, alertOutputCollector);
         metadataChangeNotifyService.registerListener(this);
         metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);
@@ -197,7 +197,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     @Override
     public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
         List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
-        if(newPolicies == null) {
+        if (newPolicies == null) {
             LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 5f58536..9755c90 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -16,13 +16,6 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -34,6 +27,14 @@ import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
 import org.apache.eagle.alert.utils.AlertConstants;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,11 +45,11 @@ import java.util.Map;
 
 @SuppressWarnings("serial")
 public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
-    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
     private final AlertPublisher alertPublisher;
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
 
-    public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService){
+    public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) {
         super(alertPublisherName, coordinatorService, config);
         this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
     }
@@ -58,7 +59,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         coordinatorService.registerListener(this);
         coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
         this.alertPublisher.init(config, stormConf);
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context);
+        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context);
     }
 
     @Override
@@ -68,9 +69,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
             alertPublisher.nextEvent((AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1));
             this.collector.ack(input);
             streamContext.counter().scope("ack_count");
-        } catch (Exception ex){
+        } catch (Exception ex) {
             streamContext.counter().scope("fail_count");
-            LOG.error(ex.getMessage(),ex);
+            LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
         }
     }
@@ -88,10 +89,12 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
 
     @Override
     public void onAlertPublishSpecChange(PublishSpec pubSpec, Map<String, StreamDefinition> sds) {
-        if(pubSpec == null) return;
+        if (pubSpec == null) {
+            return;
+        }
 
         List<Publishment> newPublishments = pubSpec.getPublishments();
-        if(newPublishments == null) {
+        if (newPublishments == null) {
             LOG.info("no publishments with PublishSpec {} for this topology", pubSpec);
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
index 04595b1..5c65d91 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
@@ -19,14 +19,10 @@
 
 package org.apache.eagle.alert.engine.runner;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.commons.collections.CollectionUtils;
 
+import java.util.*;
+
 /**
  * Since 5/2/16.
  */
@@ -36,13 +32,14 @@ public class MapComparator<K, V> {
     private List<V> added = new ArrayList<>();
     private List<V> removed = new ArrayList<>();
     private List<V> modified = new ArrayList<>();
-    public MapComparator(Map<K, V> map1, Map<K, V> map2){
+
+    public MapComparator(Map<K, V> map1, Map<K, V> map2) {
         this.map1 = map1;
         this.map2 = map2;
     }
 
     @SuppressWarnings("unchecked")
-    public void compare(){
+    public void compare() {
         Set<K> keys1 = map1.keySet();
         Set<K> keys2 = map2.keySet();
         Collection<K> addedKeys = CollectionUtils.subtract(keys1, keys2);
@@ -52,21 +49,21 @@ public class MapComparator<K, V> {
         addedKeys.forEach(k -> added.add(map1.get(k)));
         removedKeys.forEach(k -> removed.add(map2.get(k)));
         modifiedKeys.forEach(k -> {
-            if(!map1.get(k).equals(map2.get(k))){
+            if (!map1.get(k).equals(map2.get(k))) {
                 modified.add(map1.get(k));
             }
         });
     }
 
-    public List<V> getAdded(){
+    public List<V> getAdded() {
         return added;
     }
 
-    public List<V> getRemoved(){
+    public List<V> getRemoved() {
         return removed;
     }
 
-    public List<V> getModified(){
+    public List<V> getModified() {
         return modified;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
index 1a6267b..caa59b3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
@@ -16,29 +16,23 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.metric.IMetricSystem;
 import org.apache.eagle.alert.metric.MetricSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
-
 import com.codahale.metrics.Gauge;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
 
 /**
- * Share same metric system
+ * Share same metric system.
  */
 public class StormMetricConsumer implements IMetricsConsumer {
     public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class);
@@ -46,49 +40,51 @@ public class StormMetricConsumer implements IMetricsConsumer {
     private IMetricSystem metricSystem;
     private String topologyId;
 
-    @SuppressWarnings({ "serial", "rawtypes" })
+    @SuppressWarnings( {"serial", "rawtypes"})
     @Override
     public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
         Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
         topologyName = config.getString("topology.name");
         topologyId = context.getStormId();
         metricSystem = MetricSystem.load(config);
-        metricSystem.tags(new HashMap<String,Object>(){{
-            put("topologyName",topologyName);
-            put("topologyId",topologyId);
-        }});
+        metricSystem.tags(new HashMap<String, Object>() {
+            {
+                put("topologyName", topologyName);
+                put("topologyId", topologyId);
+            }
+        });
         metricSystem.start();
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
         synchronized (metricSystem) {
             List<String> metricList = new LinkedList<>();
-            for(DataPoint dataPoint:dataPoints){
-                if(dataPoint.value instanceof Map) {
-                    Map<String,Object> values = (Map<String, Object>) dataPoint.value;
-                    for(Map.Entry<String,Object> entry:values.entrySet()){
+            for (DataPoint dataPoint : dataPoints) {
+                if (dataPoint.value instanceof Map) {
+                    Map<String, Object> values = (Map<String, Object>) dataPoint.value;
+                    for (Map.Entry<String, Object> entry : values.entrySet()) {
                         String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey());
                         metricList.add(metricName);
                         Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                        if(gauge == null) {
+                        if (gauge == null) {
                             LOG.info("Register metric {}", metricName);
                             gauge = new DataPointGauge(entry.getValue());
-                            metricSystem.registry().register(metricName,gauge);
-                        }else{
+                            metricSystem.registry().register(metricName, gauge);
+                        } else {
                             ((DataPointGauge) gauge).setValue(entry.getValue());
                         }
                     }
                 } else {
                     String metricName = buildMetricName(taskInfo, dataPoint.name);
                     metricList.add(metricName);
-                    LOG.info("Register metric {}",metricName);
+                    LOG.info("Register metric {}", metricName);
                     Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                    if(gauge == null) {
+                    if (gauge == null) {
                         LOG.info("Register metric {}", metricName);
                         gauge = new DataPointGauge(dataPoint.value);
-                        metricSystem.registry().register(metricName,gauge);
+                        metricSystem.registry().register(metricName, gauge);
                     } else {
                         ((DataPointGauge) gauge).setValue(dataPoint.value);
                     }
@@ -97,17 +93,18 @@ public class StormMetricConsumer implements IMetricsConsumer {
             metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
             metricSystem.report();
             metricSystem.registry().getGauges().values().forEach((gauge -> {
-                if(gauge instanceof DataPointGauge){
-                    ((DataPointGauge)gauge).reset();
+                if (gauge instanceof DataPointGauge) {
+                    ((DataPointGauge) gauge).reset();
                 }
             }));
-            LOG.info("Reported {} metric data points from {} [{}]",dataPoints.size(),taskInfo.srcComponentId,taskInfo.srcTaskId);
+            LOG.info("Reported {} metric data points from {} [{}]", dataPoints.size(), taskInfo.srcComponentId, taskInfo.srcTaskId);
         }
     }
 
     private class DataPointGauge implements Gauge<Object> {
         private Object value;
-        public DataPointGauge(Object initialValue){
+
+        public DataPointGauge(Object initialValue) {
             this.value = initialValue;
         }
 
@@ -116,17 +113,17 @@ public class StormMetricConsumer implements IMetricsConsumer {
             return value;
         }
 
-        public void setValue(Object value){
+        public void setValue(Object value) {
             this.value = value;
         }
 
-        public void reset(){
+        public void reset() {
             this.value = 0;
         }
     }
 
-    private String buildMetricName(TaskInfo taskInfo,String ... name ){
-        return String.join(".",StringUtils.join(name,".").replace("/","."),taskInfo.srcComponentId,taskInfo.srcTaskId+"");
+    private String buildMetricName(TaskInfo taskInfo, String... name) {
+        return String.join(".", StringUtils.join(name, ".").replace("/", "."), taskInfo.srcComponentId, taskInfo.srcTaskId + "");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
index c18a44f..e060d1f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
@@ -16,34 +16,28 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.metric.IMetricSystem;
 import org.apache.eagle.alert.metric.MetricSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
-
 import com.codahale.metrics.Gauge;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
 
 /**
- * Per MetricSystem instance per task
+ * Per MetricSystem instance per task.
  */
 public class StormMetricTaggedConsumer implements IMetricsConsumer {
     public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
     private String topologyName;
-    private Map<String,MetricSystem> metricSystems;
+    private Map<String, MetricSystem> metricSystems;
     private String stormId;
     private Config config;
 
@@ -62,27 +56,29 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
         synchronized (metricSystems) {
             String uniqueTaskKey = buildUniqueTaskKey(taskInfo);
             MetricSystem metricSystem = metricSystems.get(uniqueTaskKey);
-            if(metricSystem == null){
+            if (metricSystem == null) {
                 metricSystem = MetricSystem.load(config);
-                metricSystems.put(uniqueTaskKey,metricSystem);
-                metricSystem.tags(new HashMap<String,Object>(){{
-                    put("topology",topologyName);
-                    put("stormId",stormId);
-                    put("component",taskInfo.srcComponentId);
-                    put("task",taskInfo.srcTaskId);
-                }});
+                metricSystems.put(uniqueTaskKey, metricSystem);
+                metricSystem.tags(new HashMap<String, Object>() {
+                    {
+                        put("topology", topologyName);
+                        put("stormId", stormId);
+                        put("component", taskInfo.srcComponentId);
+                        put("task", taskInfo.srcTaskId);
+                    }
+                });
                 metricSystem.start();
-                LOG.info("Initialized metric reporter for {}",uniqueTaskKey);
+                LOG.info("Initialized metric reporter for {}", uniqueTaskKey);
             }
-            report(metricSystem,taskInfo,dataPoints);
-            if(LOG.isDebugEnabled()) {
+            report(metricSystem, taskInfo, dataPoints);
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey);
             }
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void report(MetricSystem metricSystem,TaskInfo taskInfo,Collection<DataPoint> dataPoints){
+    @SuppressWarnings( {"unchecked", "rawtypes"})
+    private void report(MetricSystem metricSystem, TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
         List<String> metricList = new LinkedList<>();
         for (DataPoint dataPoint : dataPoints) {
             if (dataPoint.value instanceof Map) {
@@ -115,15 +111,16 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
         metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
         metricSystem.report();
         metricSystem.registry().getGauges().values().forEach((gauge -> {
-            if(gauge instanceof DataPointGauge){
-                ((DataPointGauge)gauge).reset();
+            if (gauge instanceof DataPointGauge) {
+                ((DataPointGauge) gauge).reset();
             }
         }));
     }
 
     private static class DataPointGauge implements Gauge<Object> {
         private Object value;
-        public DataPointGauge(Object initialValue){
+
+        public DataPointGauge(Object initialValue) {
             this.setValue(initialValue);
         }
 
@@ -132,21 +129,21 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
             return value;
         }
 
-        public void setValue(Object value){
+        public void setValue(Object value) {
             this.value = value;
         }
 
-        public void reset(){
+        public void reset() {
             this.value = 0;
         }
     }
 
-    private static String buildUniqueTaskKey(TaskInfo taskInfo){
-        return String.format("%s[%s]",taskInfo.srcComponentId,taskInfo.srcTaskId);
+    private static String buildUniqueTaskKey(TaskInfo taskInfo) {
+        return String.format("%s[%s]", taskInfo.srcComponentId, taskInfo.srcTaskId);
     }
 
-    private static String buildSimpleMetricName(TaskInfo taskInfo,String ... name ){
-        return String.join(".",StringUtils.join(name,".").replace("/","."));
+    private static String buildSimpleMetricName(TaskInfo taskInfo, String... name) {
+        return String.join(".", StringUtils.join(name, ".").replace("/", "."));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 0c1d12c..c85d183 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -16,12 +16,6 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
@@ -33,13 +27,21 @@ import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector
 import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
+public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
     private static final long serialVersionUID = -7611470889316430372L;
     private StreamRouter router;
     private StreamRouterBoltOutputCollector routeCollector;
@@ -56,8 +58,8 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
 
     @Override
     public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context);
-        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(),collector,this.getOutputStreamIds(),streamContext,serializer);
+        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
+        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), collector, this.getOutputStreamIds(), streamContext, serializer);
         router.prepare(streamContext, routeCollector);
         changeNotifyService.registerListener(this);
         changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
@@ -70,7 +72,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
             this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
         } catch (Exception ex) {
             this.streamContext.counter().scope("fail_count").incr();
-            LOG.error(ex.getMessage(),ex);
+            LOG.error(ex.getMessage(), ex);
             this.collector.fail(input);
         }
     }
@@ -83,6 +85,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
 
     /**
      * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions.
+     *
      * @param spec
      */
     @SuppressWarnings("unchecked")
@@ -92,7 +95,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
 
         // figure out added, removed, modified StreamSortSpec
         Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t ->  {
+        spec.getRouterSpecs().forEach(t -> {
             if (t.getPartition().getSortSpec() != null) {
                 newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
             }
@@ -110,11 +113,11 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         addedStreamIds.forEach(s -> added.put(s, newSSS.get(s)));
         removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s)));
         modifiedStreamIds.forEach(s -> {
-            if(!newSSS.get(s).equals(cachedSSS.get(s))){ // this means StreamSortSpec is changed for one specific streamId
+            if (!newSSS.get(s).equals(cachedSSS.get(s))) { // this means StreamSortSpec is changed for one specific streamId
                 modified.put(s, newSSS.get(s));
             }
         });
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("added StreamSortSpec " + added);
             LOG.debug("removed StreamSortSpec " + removed);
             LOG.debug("modified StreamSortSpec " + modified);
@@ -140,12 +143,12 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s)));
         removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s)));
         modifiedStreamPartitions.forEach(s -> {
-            if(!newSRS.get(s).equals(cachedSRS.get(s))){ // this means StreamRouterSpec is changed for one specific StreamPartition
+            if (!newSRS.get(s).equals(cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
                 modifiedRouterSpecs.add(newSRS.get(s));
             }
         });
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("added StreamRouterSpec " + addedRouterSpecs);
             LOG.debug("removed StreamRouterSpec " + removedRouterSpecs);
             LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs);
@@ -159,19 +162,20 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
     }
 
     /**
-     * in correlation cases, multiple streams will go to the same queue for correlation policy
+     * in correlation cases, multiple streams will go to the same queue for correlation policy.
+     *
      * @param spec
      */
-    private void sanityCheck(RouterSpec spec){
+    private void sanityCheck(RouterSpec spec) {
         Set<String> totalRequestedSlots = new HashSet<>();
-        for(StreamRouterSpec s : spec.getRouterSpecs()){
-            for(PolicyWorkerQueue q : s.getTargetQueue()){
+        for (StreamRouterSpec s : spec.getRouterSpecs()) {
+            for (PolicyWorkerQueue q : s.getTargetQueue()) {
                 List<String> workers = new ArrayList<>();
                 q.getWorkers().forEach(w -> workers.add(w.getBoltId()));
                 totalRequestedSlots.addAll(workers);
             }
         }
-        if(totalRequestedSlots.size() > getOutputStreamIds().size()){
+        if (totalRequestedSlots.size() > getOutputStreamIds().size()) {
             String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds());
             LOG.error(error);
             throw new IllegalStateException(error);


Mime
View raw message