eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [14/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:14:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
index 67dedeb..3cf5721 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
@@ -16,328 +16,319 @@
  */
 package org.apache.eagle.alert.coordinator.provider;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.*;
 
 public class NodataMetadataGenerator {
 
-	private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class);
-	
-	private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream";
-	private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream";
-	private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds";
-	private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds";
-	private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation";
-	private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert";
-	
-	private static final String DATASOURCE_TYPE = "KAFKA";
-	private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme";
-	
-	private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert";
-	private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi";
-	
-	private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector";
-	private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp";
-	private static final String STREAM_TIMESTAMP_FORMAT = "";
-	
-	private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher";
-	private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher";
-	
-	private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M";
-	private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer";
-	
-	public NodataMetadataGenerator() {}
-	
-	public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap, 
-			Map<String, Kafka2TupleMetadata> kafkaSources, 
-			Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) {
-		Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values();
-		for (StreamDefinition streamDefinition : streamDefinitions) {
-    		StreamColumn columnWithNodataExpression = null;
-    		for (StreamColumn column : streamDefinition.getColumns()) {
-    			if (StringUtils.isNotBlank(column.getNodataExpression())) {
-    				// has nodata alert setting, needs to generate the nodata alert policy
-    				if (columnWithNodataExpression != null) {
-    					columnWithNodataExpression = null;
-    					LOG.warn("Only one column in one stream is allowed to configure nodata alert");
-    					break;
-    				}
-    				columnWithNodataExpression = column;
-    			}
-    		}
-    		if (columnWithNodataExpression != null) {
-    			String streamName = streamDefinition.getStreamId();
-    			
-    			// create nodata alert aggr stream
-    			if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) {
-    				LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM);
-    			} else {
-    				streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream());
-    				LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM);
-    			}
-    			
-    			// create nodata alert aggr output stream
-    			if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) {
-    				LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-    			} else {
-    				streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream());
-    				LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-    			}
-    			
-    			// create nodata alert data source
-    			if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) {
-    				LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists", 
-    						NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME);
-    			} else {
-	    			kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource());
-	    			LOG.info("Created nodata alert aggregation datasource {} for stream {}", 
-	    					NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM);
-    			}
-    			
-    			// create nodata alert aggregation output datasource
-    			if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) {
-    				LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists", 
-    						NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-    			} else {
-	    			kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource());
-	    			LOG.info("Created nodata alert aggregation output datasource {} for stream {}", 
-	    					NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-    			}
-    			
-    			// create nodata alert policy
-    			String policyName = streamName + "_nodata_alert";
-    			String nodataExpression = columnWithNodataExpression.getNodataExpression();
-    			String[] segments = nodataExpression.split(",");
-    			long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0]));
-    			if (policies.containsKey(policyName)) {
-    				LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName);
-    			} else {
-    				policies.put(policyName, buildDynamicNodataPolicy(
-    						streamName,
-    						policyName, 
-    						columnWithNodataExpression.getName(),
-    						nodataExpression,
-    						Arrays.asList(streamName)));
-    				LOG.info("Created nodata alert policy {} with expression {} for stream {}", 
-    						policyName, nodataExpression, streamName);
-    			}
-    			
-    			// create nodata alert aggregation
-    			String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy";
-    			if (policies.containsKey(aggrPolicyName)) {
-    				LOG.info("Stream: {} nodata alert aggregation policy: {} already exists", 
-    						NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName);
-    			} else {
-    				policies.put(aggrPolicyName, buildAggregationPolicy(
-    						aggrPolicyName, 
-    						columnWithNodataExpression.getName(),
-    						windowPeriodInSeconds));
-    				LOG.info("Created nodata alert aggregation policy {} for stream {}", 
-    						aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-    			}
-    			
-    			// create nodata alert publish
-    			String publishmentName = policyName + "_publish";
-    			if (publishments.containsKey(publishmentName)) {
-    				LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName);
-    			} else {
-	    			String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
-	    			publishments.put(publishmentName, buildKafkaAlertPublishment(
-	    					publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
-	    			publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config, 
-	    					publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
-	    			LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
-    			}
-    			
-    			// create nodata alert aggregation publish
-    			String aggrPublishName = aggrPolicyName + "_publish";
-    			if (publishments.containsKey(aggrPublishName)) {
-    				LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName);
-    			} else {
-	    			String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
-	    			publishments.put(aggrPublishName, buildKafkaAlertPublishment(
-	    					aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
-	    			publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config, 
-	    					aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
-	    			LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
-    			}
-    		}
-    	}
-	}
-	
-	private Kafka2TupleMetadata buildAggregationDatasource() {
-		Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-		datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME);
-		datasource.setType(DATASOURCE_TYPE);
-		datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
-		datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME);
-		Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
-		codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
-		codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
-		codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
-		Properties codecProperties = new Properties();
-		codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM);
-		codecProperties.put("streamNameFormat", "%s");
-		codec.setStreamNameSelectorProp(codecProperties);
-		datasource.setCodec(codec);
-		return datasource;
-	}
-	
-	private Kafka2TupleMetadata buildAggregationOutputDatasource() {
-		Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-		datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-		datasource.setType(DATASOURCE_TYPE);
-		datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
-		datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME);
-		Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
-		codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
-		codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
-		codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
-		Properties codecProperties = new Properties();
-		codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-		codecProperties.put("streamNameFormat", "%s");
-		codec.setStreamNameSelectorProp(codecProperties);
-		datasource.setCodec(codec);
-		return datasource;
-	}
-	
-	private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName, 
-    		String columnName, String expression, List<String> inputStream) {
-		PolicyDefinition pd = new PolicyDefinition();
-		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-		//expression, something like "PT5S,dynamic,1,host"
-		def.setValue(expression);
-		def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
-		pd.setDefinition(def);
-		pd.setInputStreams(inputStream);
-		pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
-		pd.setName(policyName);
-		pd.setDescription(String.format("Nodata alert policy for stream %s", streamName));
-		
-		StreamPartition sp = new StreamPartition();
-		sp.setStreamId(streamName);
-		sp.setColumns(Arrays.asList(columnName));
-		sp.setType(StreamPartition.Type.GROUPBY);
-		pd.addPartition(sp);
-		return pd;
-	}
-    
-    private PolicyDefinition buildAggregationPolicy(String policyName, String columnName, 
-    		long windowPeriodInSeconds) {
-    	PolicyDefinition pd = new PolicyDefinition();
-		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-		String SiddhiQL = String.format(
-				"from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, "
-				+ "originalStreamName as streamName group by originalStreamName insert into %s", 
-				NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2, 
-				columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-		LOG.info("Generated SiddhiQL {} for stream: {}", SiddhiQL, NODATA_ALERT_AGGR_STREAM);
-		def.setValue(SiddhiQL);
-		def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE);
-		pd.setDefinition(def);
-		pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
-		pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM));
-		pd.setName(policyName);
-		pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts");
-		
-		StreamPartition sp = new StreamPartition();
-		sp.setStreamId(NODATA_ALERT_AGGR_STREAM);
-		sp.setColumns(Arrays.asList(columnName));
-		sp.setType(StreamPartition.Type.GROUPBY);
-		pd.addPartition(sp);
-		pd.setParallelismHint(0);
-    	return pd;
+    private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class);
+
+    private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream";
+    private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream";
+    private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds";
+    private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds";
+    private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation";
+    private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert";
+
+    private static final String DATASOURCE_TYPE = "KAFKA";
+    private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme";
+
+    private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert";
+    private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi";
+
+    private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector";
+    private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp";
+    private static final String STREAM_TIMESTAMP_FORMAT = "";
+
+    private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher";
+    private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher";
+
+    private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M";
+    private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer";
+
+    public NodataMetadataGenerator() {
+    }
+
+    public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap,
+                        Map<String, Kafka2TupleMetadata> kafkaSources,
+                        Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) {
+        Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values();
+        for (StreamDefinition streamDefinition : streamDefinitions) {
+            StreamColumn columnWithNodataExpression = null;
+            for (StreamColumn column : streamDefinition.getColumns()) {
+                if (StringUtils.isNotBlank(column.getNodataExpression())) {
+                    // has nodata alert setting, needs to generate the nodata alert policy
+                    if (columnWithNodataExpression != null) {
+                        columnWithNodataExpression = null;
+                        LOG.warn("Only one column in one stream is allowed to configure nodata alert");
+                        break;
+                    }
+                    columnWithNodataExpression = column;
+                }
+            }
+            if (columnWithNodataExpression != null) {
+                final String streamName = streamDefinition.getStreamId();
+
+                // create nodata alert aggr stream
+                if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) {
+                    LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM);
+                } else {
+                    streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream());
+                    LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM);
+                }
+
+                // create nodata alert aggr output stream
+                if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) {
+                    LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+                } else {
+                    streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream());
+                    LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+                }
+
+                // create nodata alert data source
+                if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) {
+                    LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists",
+                        NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME);
+                } else {
+                    kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource());
+                    LOG.info("Created nodata alert aggregation datasource {} for stream {}",
+                        NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM);
+                }
+
+                // create nodata alert aggregation output datasource
+                if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) {
+                    LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists",
+                        NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+                } else {
+                    kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource());
+                    LOG.info("Created nodata alert aggregation output datasource {} for stream {}",
+                        NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+                }
+
+                // create nodata alert policy
+                String policyName = streamName + "_nodata_alert";
+                String nodataExpression = columnWithNodataExpression.getNodataExpression();
+                String[] segments = nodataExpression.split(",");
+                long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0]));
+                if (policies.containsKey(policyName)) {
+                    LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName);
+                } else {
+                    policies.put(policyName, buildDynamicNodataPolicy(
+                        streamName,
+                        policyName,
+                        columnWithNodataExpression.getName(),
+                        nodataExpression,
+                        Arrays.asList(streamName)));
+                    LOG.info("Created nodata alert policy {} with expression {} for stream {}",
+                        policyName, nodataExpression, streamName);
+                }
+
+                // create nodata alert aggregation
+                String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy";
+                if (policies.containsKey(aggrPolicyName)) {
+                    LOG.info("Stream: {} nodata alert aggregation policy: {} already exists",
+                        NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName);
+                } else {
+                    policies.put(aggrPolicyName, buildAggregationPolicy(
+                        aggrPolicyName,
+                        columnWithNodataExpression.getName(),
+                        windowPeriodInSeconds));
+                    LOG.info("Created nodata alert aggregation policy {} for stream {}",
+                        aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+                }
+
+                // create nodata alert publish
+                String publishmentName = policyName + "_publish";
+                if (publishments.containsKey(publishmentName)) {
+                    LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName);
+                } else {
+                    String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+                    publishments.put(publishmentName, buildKafkaAlertPublishment(
+                        publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+                    publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config,
+                        publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+                    LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+                }
+
+                // create nodata alert aggregation publish
+                String aggrPublishName = aggrPolicyName + "_publish";
+                if (publishments.containsKey(aggrPublishName)) {
+                    LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName);
+                } else {
+                    String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+                    publishments.put(aggrPublishName, buildKafkaAlertPublishment(
+                        aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+                    publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config,
+                        aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+                    LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+                }
+            }
+        }
+    }
+
+    private Kafka2TupleMetadata buildAggregationDatasource() {
+        Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+        datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+        datasource.setType(DATASOURCE_TYPE);
+        datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+        datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME);
+        Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+        codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+        codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+        codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+        Properties codecProperties = new Properties();
+        codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM);
+        codecProperties.put("streamNameFormat", "%s");
+        codec.setStreamNameSelectorProp(codecProperties);
+        datasource.setCodec(codec);
+        return datasource;
+    }
+
+    private Kafka2TupleMetadata buildAggregationOutputDatasource() {
+        Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+        datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+        datasource.setType(DATASOURCE_TYPE);
+        datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+        datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME);
+        Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+        codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+        codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+        codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+        Properties codecProperties = new Properties();
+        codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+        codecProperties.put("streamNameFormat", "%s");
+        codec.setStreamNameSelectorProp(codecProperties);
+        datasource.setCodec(codec);
+        return datasource;
+    }
+
+    private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName,
+                                                      String columnName, String expression, List<String> inputStream) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        //expression, something like "PT5S,dynamic,1,host"
+        def.setValue(expression);
+        def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
+        pd.setDefinition(def);
+        pd.setInputStreams(inputStream);
+        pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Nodata alert policy for stream %s", streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList(columnName));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
     }
-	
-	private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) {
-		Publishment publishment = new Publishment();
-		publishment.setName(publishmentName);
-		publishment.setType(KAFKA_PUBLISHMENT_TYPE);
-		publishment.setPolicyIds(Arrays.asList(policyName));
-		publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
-		Map<String, String> publishmentProperties = new HashMap<String, String>();
-		publishmentProperties.put("kafka_broker", kafkaBroker);
-		publishmentProperties.put("topic", topic);
-		publishment.setProperties(publishmentProperties);
-		publishment.setSerializer(PUBLISHMENT_SERIALIZER);
-		return publishment;
-	}
-	
-	private Publishment buildEmailAlertPublishment(Config config, 
-			String publishmentName, String policyName, String kafkaBroker, String topic) {
-		Publishment publishment = new Publishment();
-		publishment.setName(publishmentName);
-		publishment.setType(EMAIL_PUBLISHMENT_TYPE);
-		publishment.setPolicyIds(Arrays.asList(policyName));
-		publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
-		Map<String, String> publishmentProperties = new HashMap<String, String>();
-		publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic));
-		publishmentProperties.put("template", "");
-		publishmentProperties.put("sender", config.getString("email.sender"));
-		publishmentProperties.put("recipients", config.getString("email.recipients"));
-		publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost"));
-		publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort"));
-		publishmentProperties.put("connection", "plaintext");
-		publishment.setProperties(publishmentProperties);
-		publishment.setSerializer(PUBLISHMENT_SERIALIZER);
-		return publishment;
-	}
-	
-	private StreamDefinition buildAggregationStream() {
-		StreamDefinition sd = new StreamDefinition();
-		StreamColumn tsColumn = new StreamColumn();
-		tsColumn.setName("timestamp");
-		tsColumn.setType(StreamColumn.Type.LONG);
-		
-		StreamColumn hostColumn = new StreamColumn();
-		hostColumn.setName("host");
-		hostColumn.setType(StreamColumn.Type.STRING);
-
-		StreamColumn originalStreamNameColumn = new StreamColumn();
-		originalStreamNameColumn.setName("originalStreamName");
-		originalStreamNameColumn.setType(StreamColumn.Type.STRING);
-		
-		sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn));
-		sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME);
-		sd.setStreamId(NODATA_ALERT_AGGR_STREAM);
-		sd.setDescription("Nodata alert aggregation stream");
-		return sd;
-	}
-	
-	private StreamDefinition buildAggregationOutputStream() {
-		StreamDefinition sd = new StreamDefinition();
-		StreamColumn hostColumn = new StreamColumn();
-		hostColumn.setName("hosts");
-		hostColumn.setType(StreamColumn.Type.STRING);
-		
-		StreamColumn osnColumn = new StreamColumn();
-		osnColumn.setName("streamName");
-		osnColumn.setType(StreamColumn.Type.STRING);
-		
-		sd.setColumns(Arrays.asList(hostColumn, osnColumn));
-		sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-		sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM);
-		sd.setDescription("Nodata alert aggregation output stream");
-		return sd;
-	}
-    
+
+    private PolicyDefinition buildAggregationPolicy(String policyName, String columnName,
+                                                    long windowPeriodInSeconds) {
+        final PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        String siddhiQL = String.format(
+            "from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, "
+                + "originalStreamName as streamName group by originalStreamName insert into %s",
+            NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2,
+            columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+        LOG.info("Generated SiddhiQL {} for stream: {}", siddhiQL, NODATA_ALERT_AGGR_STREAM);
+        def.setValue(siddhiQL);
+        def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE);
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+        pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM));
+        pd.setName(policyName);
+        pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts");
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(NODATA_ALERT_AGGR_STREAM);
+        sp.setColumns(Arrays.asList(columnName));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        pd.setParallelismHint(0);
+        return pd;
+    }
+
+    private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) {
+        Publishment publishment = new Publishment();
+        publishment.setName(publishmentName);
+        publishment.setType(KAFKA_PUBLISHMENT_TYPE);
+        publishment.setPolicyIds(Arrays.asList(policyName));
+        publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+        Map<String, String> publishmentProperties = new HashMap<String, String>();
+        publishmentProperties.put("kafka_broker", kafkaBroker);
+        publishmentProperties.put("topic", topic);
+        publishment.setProperties(publishmentProperties);
+        publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+        return publishment;
+    }
+
+    private Publishment buildEmailAlertPublishment(Config config,
+                                                   String publishmentName, String policyName, String kafkaBroker, String topic) {
+        Publishment publishment = new Publishment();
+        publishment.setName(publishmentName);
+        publishment.setType(EMAIL_PUBLISHMENT_TYPE);
+        publishment.setPolicyIds(Arrays.asList(policyName));
+        publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+        Map<String, String> publishmentProperties = new HashMap<String, String>();
+        publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic));
+        publishmentProperties.put("template", "");
+        publishmentProperties.put("sender", config.getString("email.sender"));
+        publishmentProperties.put("recipients", config.getString("email.recipients"));
+        publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost"));
+        publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort"));
+        publishmentProperties.put("connection", "plaintext");
+        publishment.setProperties(publishmentProperties);
+        publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+        return publishment;
+    }
+
+    private StreamDefinition buildAggregationStream() {
+        final StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn originalStreamNameColumn = new StreamColumn();
+        originalStreamNameColumn.setName("originalStreamName");
+        originalStreamNameColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn));
+        sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+        sd.setStreamId(NODATA_ALERT_AGGR_STREAM);
+        sd.setDescription("Nodata alert aggregation stream");
+        return sd;
+    }
+
+    private StreamDefinition buildAggregationOutputStream() {
+        final StreamDefinition sd = new StreamDefinition();
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("hosts");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn osnColumn = new StreamColumn();
+        osnColumn.setName("streamName");
+        osnColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(hostColumn, osnColumn));
+        sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+        sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM);
+        sd.setDescription("Nodata alert aggregation output stream");
+        return sd;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index dd38395..193b98f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -16,24 +16,10 @@
  */
 package org.apache.eagle.alert.coordinator.provider;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordination.model.internal.*;
 import org.apache.eagle.alert.coordinator.IScheduleContext;
 import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
 import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
@@ -44,17 +30,18 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * FIXME: this class focus on correctness, not the efficiency now. There might
  * be problem when metadata size grows too big.
- * 
- * @since May 3, 2016
  *
+ * @since May 3, 2016
  */
 public class ScheduleContextBuilder {
 
@@ -74,18 +61,18 @@ public class ScheduleContextBuilder {
     private Map<String, TopologyUsage> usages;
 
     public ScheduleContextBuilder(Config config) {
-    	this.config = config;
+        this.config = config;
         client = new MetadataServiceClientImpl(config);
     }
 
     public ScheduleContextBuilder(Config config, IMetadataServiceClient client) {
-    	this.config = config;
+        this.config = config;
         this.client = client;
     }
 
     /**
      * Built a shcedule context for metadata client service.
-     * 
+     *
      * @return
      */
     public IScheduleContext buildContext() {
@@ -93,13 +80,13 @@ public class ScheduleContextBuilder {
         kafkaSources = listToMap(client.listDataSources());
         // filter out disabled policies
         List<PolicyDefinition> enabledPolicies = client.listPolicies().stream().filter(
-        		(t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList());
+            (t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList());
         policies = listToMap(enabledPolicies);
         publishments = listToMap(client.listPublishment());
         streamDefinitions = listToMap(client.listStreams());
         // generate data sources, policies, publishments for nodata alert
         new NodataMetadataGenerator().execute(config, streamDefinitions, kafkaSources, policies, publishments);
-        
+
         // TODO: See ScheduleState comments on how to improve the storage
         ScheduleState state = client.getVersionedSpec();
 
@@ -118,9 +105,9 @@ public class ScheduleContextBuilder {
 
         // copy to shedule context now
         return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
-                streamDefinitions, monitoredStreamMap, usages);
+            streamDefinitions, monitoredStreamMap, usages);
     }
-    
+
     /**
      * 1.
      * <pre>
@@ -133,12 +120,13 @@ public class ScheduleContextBuilder {
      * <pre>
      * if monitored stream's queue's is on non-existing topology, remove it.
      * </pre>
+     *
      * @param monitoredStreams
      * @return
      */
     private List<MonitoredStream> detectMonitoredStreams(List<MonitoredStream> monitoredStreams) {
         List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams);
-        
+
         // clear deprecated streams
         clearMonitoredStreams(monitoredStreams);
 
@@ -159,8 +147,8 @@ public class ScheduleContextBuilder {
             StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
             if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) {
                 LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, "
-                                + "this indicates a policy stream partition spec change, the assignment would be removed! ",
-                        assignment, def.getPartitionSpec(), group == null ? "'not found'" :group.getStreamPartitions());
+                        + "this indicates a policy stream partition spec change, the assignment would be removed! ",
+                    assignment, def.getPartitionSpec(), group == null ? "'not found'" : group.getStreamPartitions());
                 toRemove.add(assignment.getPolicyName());
             } else {
                 usedGroups.add(group);
@@ -174,9 +162,9 @@ public class ScheduleContextBuilder {
             boolean used = usedGroups.contains(t.getStreamGroup());
             if (!used) {
                 LOG.warn("monitor stream with stream group {} is not referenced, "
-                        + "this monitored stream and its worker queu will be removed", t.getStreamGroup());
+                    + "this monitored stream and its worker queu will be removed", t.getStreamGroup());
             }
-            return !used; 
+            return !used;
         });
 
         return result;
@@ -194,7 +182,7 @@ public class ScheduleContextBuilder {
                 for (WorkSlot ws : queue.getWorkingSlots()) {
                     // check if topology available or bolt available
                     if (!topologies.containsKey(ws.topologyName)
-                            || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
+                        || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
                         deprecated = true;
                         break;
                     }
@@ -230,7 +218,7 @@ public class ScheduleContextBuilder {
             } else {
                 StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId());
                 if (queue == null
-                        || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) {
+                    || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) {
                     // queue not found or policy has hint bigger than queue (possible a poilcy update)
                     LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue);
                     paIt.remove();
@@ -286,7 +274,7 @@ public class ScheduleContextBuilder {
         Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
 
         preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
-        
+
         for (Topology t : topologies.values()) {
             TopologyUsage u = new TopologyUsage(t.getName());
             // add group/bolt usages
@@ -323,8 +311,8 @@ public class ScheduleContextBuilder {
     }
 
     private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
-            Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt,
-            AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) {
+                                  Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt,
+                                  AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) {
         //
         if (bolt2Policies.containsKey(uniqueAlertBolt)) {
             alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
@@ -371,15 +359,15 @@ public class ScheduleContextBuilder {
     }
 
     private void preBuildQueue2TopoMap(
-            Map<String, Set<MonitoredStream>> topo2MonitorStream,
-            Map<String, Set<String>> topo2Policies, 
-            Map<String, Set<String>> bolt2Policies, 
-            Map<String, Set<StreamGroup>> bolt2Partition, 
-            Map<String, Set<String>> bolt2QueueIds,
-            Map<String, StreamWorkSlotQueue> queueMap) {
+        Map<String, Set<MonitoredStream>> topo2MonitorStream,
+        Map<String, Set<String>> topo2Policies,
+        Map<String, Set<String>> bolt2Policies,
+        Map<String, Set<StreamGroup>> bolt2Partition,
+        Map<String, Set<String>> bolt2QueueIds,
+        Map<String, StreamWorkSlotQueue> queueMap) {
         // pre-build structure
         // why don't reuse the queue.getPolicies
-        Map<String, Set<String>> queue2Policies= new HashMap<String, Set<String>>();
+        Map<String, Set<String>> queue2Policies = new HashMap<String, Set<String>>();
         for (PolicyAssignment pa : assignments.values()) {
             if (!queue2Policies.containsKey(pa.getQueueId())) {
                 queue2Policies.put(pa.getQueueId(), new HashSet<String>());
@@ -391,32 +379,32 @@ public class ScheduleContextBuilder {
             for (StreamWorkSlotQueue q : stream.getQueues()) {
                 queueMap.put(q.getQueueId(), q);
                 Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>();
-                
+
                 for (WorkSlot slot : q.getWorkingSlots()) {
                     // topo2monitoredstream
                     if (!topo2MonitorStream.containsKey(slot.getTopologyName())) {
                         topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>());
                     }
                     topo2MonitorStream.get(slot.getTopologyName()).add(stream);
-                    
+
                     // topo2policy
                     if (!topo2Policies.containsKey(slot.getTopologyName())) {
                         topo2Policies.put(slot.getTopologyName(), new HashSet<String>());
                     }
                     topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
-                    
+
                     // bolt2Policy
                     if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
                         bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>());
                     }
                     bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
-                    
+
                     // bolt2Queue
                     if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
                         bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>());
                     }
                     bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
-                    
+
                     // bolt2Partition
                     if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
                         bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
index c715bf8..273ab33 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
@@ -16,24 +16,24 @@
  */
 package org.apache.eagle.alert.coordinator.resource;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordinator.Coordinator;
 import org.apache.eagle.alert.coordinator.ScheduleOption;
 import org.apache.eagle.alert.utils.JsonUtils;
 
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
 /**
  * This is to provide API access even we don't have ZK as intermediate access.
  * FIXME : more elogant status code
- * 
+ *
  * @since Mar 24, 2016 <br/>
  */
 @Path("/coordinator")
-@Produces({ "application/json" })
+@Produces( {"application/json"})
 public class CoordinatorResource {
 
     // sprint config here?
@@ -65,7 +65,7 @@ public class CoordinatorResource {
     public void disablePeriodicallyBuild() {
         alertCoordinator.disablePeriodicallyBuild();
     }
-    
+
     @SuppressWarnings("static-access")
     @GET
     @Path("/periodicForceBuildState")
@@ -74,9 +74,7 @@ public class CoordinatorResource {
     }
 
     /**
-     * Manually update the topology usages, for administration
-     * 
-     * @return
+     * Manually update the topology usages, for administration.
      */
     @POST
     @Path("/refreshUsages")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
index 4e17179..ce3c02b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
@@ -16,28 +16,21 @@
  */
 package org.apache.eagle.alert.coordinator.trigger;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.eagle.alert.config.ConfigBusProducer;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.coordinator.IPolicyScheduler;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.PolicySchedulerFactory;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.*;
 import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
+import com.google.common.base.Stopwatch;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Stopwatch;
-import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
 
 /**
- * @since Jun 27, 2016
- *
+ * @since Jun 27, 2016.
  */
 public class CoordinatorTrigger implements Runnable {
     // TODO : support configurable in coordiantor
@@ -77,7 +70,7 @@ public class CoordinatorTrigger implements Runnable {
 
                 watch.stop();
                 LOG.info("CoordinatorTrigger ended, used time {} sm.", watch.elapsed(TimeUnit.MILLISECONDS));
-            }  else {
+            } else {
                 LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable = false, skipped build");
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
index df7556d..a60c959 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
@@ -16,25 +16,20 @@
  */
 package org.apache.eagle.alert.coordinator.trigger;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Stopwatch;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Poll policy change and notify listeners
+ * Poll policy change and notify listeners.
  */
-public class DynamicPolicyLoader implements Runnable{
+public class DynamicPolicyLoader implements Runnable {
     private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
 
     private IMetadataServiceClient client;
@@ -42,7 +37,7 @@ public class DynamicPolicyLoader implements Runnable{
     private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
     private List<PolicyChangeListener> listeners = new ArrayList<>();
 
-    public DynamicPolicyLoader(IMetadataServiceClient client){
+    public DynamicPolicyLoader(IMetadataServiceClient client) {
         this.client = client;
     }
 
@@ -52,14 +47,14 @@ public class DynamicPolicyLoader implements Runnable{
 
     /**
      * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected
-     * to be addedPolicies
+     * to be addedPolicies.
      */
     @SuppressWarnings("unchecked")
     @Override
     public void run() {
         // we should catch every exception to avoid zombile thread
         try {
-            Stopwatch watch = Stopwatch.createStarted();
+            final Stopwatch watch = Stopwatch.createStarted();
             LOG.info("policies loader start.");
             List<PolicyDefinition> current = client.listPolicies();
             Map<String, PolicyDefinition> currPolicies = new HashMap<>();
@@ -77,9 +72,9 @@ public class DynamicPolicyLoader implements Runnable{
             }
 
             boolean policyChanged = false;
-            if (addedPolicies.size() != 0 ||
-                    removedPolicies.size() != 0 ||
-                    reallyModifiedPolicies.size() != 0) {
+            if (addedPolicies.size() != 0
+                || removedPolicies.size() != 0
+                || reallyModifiedPolicies.size() != 0) {
                 policyChanged = true;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
index be8c68a..d36765f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
@@ -16,11 +16,10 @@
  */
 package org.apache.eagle.alert.coordinator.trigger;
 
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
 public interface PolicyChangeListener {
     void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
index b618dfc..79056d4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
@@ -44,7 +44,6 @@ import com.typesafe.config.ConfigFactory;
 
 /**
  * @since May 5, 2016
- *
  */
 public class CoordinatorTest {
 
@@ -63,7 +62,7 @@ public class CoordinatorTest {
         zkEmbed.shutdown();
     }
 
-    @SuppressWarnings({ "resource", "unused" })
+    @SuppressWarnings( {"resource", "unused"})
     @Ignore
     @Test
     public void test() throws Exception {
@@ -92,7 +91,7 @@ public class CoordinatorTest {
         Assert.assertTrue(validated.get());
     }
 
-    @SuppressWarnings({ "resource", "unused" })
+    @SuppressWarnings( {"resource", "unused"})
     @Test
     public void test_01() throws Exception {
         before();
@@ -138,7 +137,7 @@ public class CoordinatorTest {
         ConfigFactory.invalidateCaches();
         ConfigFactory.load().getConfig("coordinator");
     }
-    
+
     @Test
     public void test_Schedule() {
         Coordinator.startSchedule();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
index fe62b3b..a86dd04 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
@@ -24,4 +24,4 @@ package org.apache.alert.coordinator;
  */
 @org.junit.Ignore
 public class DynamicPolicyLoaderTest {
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
index e2ea031..d71dd88 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
@@ -30,7 +30,6 @@ import com.typesafe.config.ConfigFactory;
 
 /**
  * @since May 9, 2016
- *
  */
 public class MetadataServiceClientImplTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
index 43dc9c1..9d2b9c7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
@@ -37,67 +37,67 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 public class NodataMetadataGeneratorTest {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class);
-
-	Config config = ConfigFactory.load().getConfig("coordinator");
-	private NodataMetadataGenerator generator;
-	
-	@Before
-	public void setup() {
-		generator = new NodataMetadataGenerator();
-	}
-	
-	@Test
-	public void testNormal() throws Exception {
-		StreamDefinition sd = createStreamDefinitionWithNodataAlert();
-		Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>();
-		streamDefinitionsMap.put(sd.getStreamId(), sd);
-		
-		Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>();
-		Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
-		Map<String, Publishment> publishments = new HashMap<String, Publishment>();
-		
-		generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments);
-		
-		Assert.assertEquals(2, kafkaSources.size());
-		
-		kafkaSources.forEach((key, value) -> {
-			LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value));
-		});
-		
-		Assert.assertEquals(2, policies.size());
-		
-		policies.forEach((key, value) -> {
-			LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value));
-		});
-		
-		Assert.assertEquals(4, publishments.size());
-		
-		publishments.forEach((key, value) -> {
-			LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value));
-		});
-	}
-	
-	private StreamDefinition createStreamDefinitionWithNodataAlert() {
-		StreamDefinition sd = new StreamDefinition();
-		StreamColumn tsColumn = new StreamColumn();
-		tsColumn.setName("timestamp");
-		tsColumn.setType(StreamColumn.Type.LONG);
-
-		StreamColumn hostColumn = new StreamColumn();
-		hostColumn.setName("host");
-		hostColumn.setType(StreamColumn.Type.STRING);
-		hostColumn.setNodataExpression("PT1M,dynamic,1,host");
-
-		StreamColumn valueColumn = new StreamColumn();
-		valueColumn.setName("value");
-		valueColumn.setType(StreamColumn.Type.DOUBLE);
-
-		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-		sd.setDataSource("testDataSource");
-		sd.setStreamId("testStreamId");
-		return sd;
-	}
-	
+
+    private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class);
+
+    Config config = ConfigFactory.load().getConfig("coordinator");
+    private NodataMetadataGenerator generator;
+
+    @Before
+    public void setup() {
+        generator = new NodataMetadataGenerator();
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        StreamDefinition sd = createStreamDefinitionWithNodataAlert();
+        Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>();
+        streamDefinitionsMap.put(sd.getStreamId(), sd);
+
+        Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>();
+        Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
+        Map<String, Publishment> publishments = new HashMap<String, Publishment>();
+
+        generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments);
+
+        Assert.assertEquals(2, kafkaSources.size());
+
+        kafkaSources.forEach((key, value) -> {
+            LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value));
+        });
+
+        Assert.assertEquals(2, policies.size());
+
+        policies.forEach((key, value) -> {
+            LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value));
+        });
+
+        Assert.assertEquals(4, publishments.size());
+
+        publishments.forEach((key, value) -> {
+            LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value));
+        });
+    }
+
+    private StreamDefinition createStreamDefinitionWithNodataAlert() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+        hostColumn.setNodataExpression("PT1M,dynamic,1,host");
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("value");
+        valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testStreamId");
+        return sd;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
index 84153f6..ac83c73 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -54,12 +54,11 @@ import com.typesafe.config.ConfigFactory;
 
 /**
  * @since May 5, 2016
- *
  */
 public class ScheduleContextBuilderTest {
 
-	Config config = ConfigFactory.load().getConfig("coordinator");
-	
+    Config config = ConfigFactory.load().getConfig("coordinator");
+
     @Test
     public void test() {
         InMemMetadataServiceClient client = getSampleMetadataService();
@@ -78,7 +77,7 @@ public class ScheduleContextBuilderTest {
         String alertBolt2 = TOPO1 + "-alert-" + "2";
         for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) {
             if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1)
-                    || u.getBoltId().equals(alertBolt2)) {
+                || u.getBoltId().equals(alertBolt2)) {
                 Assert.assertEquals(1, u.getPolicies().size());
                 Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1));
                 Assert.assertEquals(1, u.getPartitions().size());
@@ -195,51 +194,51 @@ public class ScheduleContextBuilderTest {
         // just to make sure queueNew is present
         Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
     }
-    
+
     @Test
     public void test_stream_noalert_policies_generation() throws Exception {
-    	InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert();
-    	
+        InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert();
+
         ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
         IScheduleContext context = builder.buildContext();
-        
+
         PolicyDefinition policyDefinition = null;
         PolicyDefinition aggrPolicyDefinition = null;
         for (Entry<String, PolicyDefinition> entry : context.getPolicies().entrySet()) {
-        	if (entry.getKey().endsWith("_nodata_alert")) {
-        		policyDefinition = entry.getValue();
-        		continue;
-        	}
-        	if (entry.getKey().endsWith("_aggregation_stream_policy")) {
-        		aggrPolicyDefinition = entry.getValue();
-        		continue;
-        	}
+            if (entry.getKey().endsWith("_nodata_alert")) {
+                policyDefinition = entry.getValue();
+                continue;
+            }
+            if (entry.getKey().endsWith("_aggregation_stream_policy")) {
+                aggrPolicyDefinition = entry.getValue();
+                continue;
+            }
         }
         Assert.assertEquals(3, context.getPolicies().size());
-        
+
         Assert.assertNotNull(policyDefinition);
         Assert.assertEquals("nodataalert", policyDefinition.getDefinition().getType());
         Assert.assertEquals("PT5S,dynamic,1," + COL1, policyDefinition.getDefinition().getValue());
-        
+
         Assert.assertNotNull(aggrPolicyDefinition);
         Assert.assertEquals("siddhi", aggrPolicyDefinition.getDefinition().getType());
-        
+
         Kafka2TupleMetadata datasource = null;
         for (Entry<String, Kafka2TupleMetadata> entry : context.getDataSourceMetadata().entrySet()) {
-        	if ("nodata_alert_aggregation_ds".equals(entry.getKey())) {
-        		datasource = entry.getValue();
-        		break;
-        	}
+            if ("nodata_alert_aggregation_ds".equals(entry.getKey())) {
+                datasource = entry.getValue();
+                break;
+            }
         }
         Assert.assertNotNull(datasource);
-        
+
         String publishmentName = policyDefinition.getName() + "_publish";
         Publishment publishment = null;
         for (Entry<String, Publishment> entry : context.getPublishments().entrySet()) {
-        	if (publishmentName.equals(entry.getKey())) {
-        		publishment = entry.getValue();
-        		break;
-        	}
+            if (publishmentName.equals(entry.getKey())) {
+                publishment = entry.getValue();
+                break;
+            }
         }
         Assert.assertNotNull(publishment);
     }
@@ -280,7 +279,7 @@ public class ScheduleContextBuilderTest {
         client.addScheduleState(createScheduleState());
         return client;
     }
-    
+
     public static InMemMetadataServiceClient getSampleMetadataServiceWithNodataAlert() {
         InMemMetadataServiceClient client = new InMemMetadataServiceClient();
         client.addTopology(createSampleTopology());
@@ -291,7 +290,7 @@ public class ScheduleContextBuilderTest {
         client.addScheduleState(createScheduleState());
         return client;
     }
-    
+
     private static StreamDefinition createStreamDefinitionWithNodataAlert() {
         StreamDefinition def = new StreamDefinition();
         def.setStreamId(TEST_STREAM_DEF_1);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
index 53de19a..d0b7a09 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
@@ -70,9 +70,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 /**
- * 
  * @since Apr 22, 2016
- *
  */
 public class SchedulerTest {
 
@@ -375,7 +373,7 @@ public class SchedulerTest {
                 StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight();
                 StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight();
                 Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(),
-                        queue3.getWorkingSlots().get(0).getTopologyName());
+                    queue3.getWorkingSlots().get(0).getTopologyName());
             }
         }
         // group spec
@@ -722,7 +720,7 @@ public class SchedulerTest {
         List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
         for (Topology t : topos) {
             context.addTopology(t);
-            
+
             TopologyUsage u = new TopologyUsage(t.getName());
             for (String gnid : t.getGroupNodeIds()) {
                 u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
index 04b760a..7e170a8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
@@ -36,82 +36,82 @@ import com.google.common.base.Joiner;
 @Ignore
 public class TestExclusiveExecutor {
 
-	ZookeeperEmbedded zkEmbed;
+    ZookeeperEmbedded zkEmbed;
 
-	@Before
-	public void setUp() throws Exception {
-		zkEmbed = new ZookeeperEmbedded(2181);
-		zkEmbed.start();
+    @Before
+    public void setUp() throws Exception {
+        zkEmbed = new ZookeeperEmbedded(2181);
+        zkEmbed.start();
 
-		Thread.sleep(2000);
-	}
+        Thread.sleep(2000);
+    }
 
-	@After
-	public void tearDown() throws Exception {
-		zkEmbed.shutdown();
-	}
+    @After
+    public void tearDown() throws Exception {
+        zkEmbed.shutdown();
+    }
 
-	@Test
-	public void testConcurrency() throws Exception {
-		ByteArrayOutputStream newStreamOutput = new ByteArrayOutputStream();
-		PrintStream newStream = new PrintStream(newStreamOutput);
-		PrintStream oldStream = System.out;
+    @Test
+    public void testConcurrency() throws Exception {
+        ByteArrayOutputStream newStreamOutput = new ByteArrayOutputStream();
+        PrintStream newStream = new PrintStream(newStreamOutput);
+        PrintStream oldStream = System.out;
 
-		System.setOut(newStream);
+        System.setOut(newStream);
 
-		ExclusiveExecutor.Runnable runnableOne = new ExclusiveExecutor.Runnable() {
+        ExclusiveExecutor.Runnable runnableOne = new ExclusiveExecutor.Runnable() {
 
-			@Override
-			public void run() throws Exception {
-				System.out.println("this is thread one");
-			}
+            @Override
+            public void run() throws Exception {
+                System.out.println("this is thread one");
+            }
 
-		};
+        };
 
-		new Thread(new Runnable() {
+        new Thread(new Runnable() {
 
-			@Override
-			public void run() {
-				ExclusiveExecutor.execute("/alert/test/leader", runnableOne);
-			}
+            @Override
+            public void run() {
+                ExclusiveExecutor.execute("/alert/test/leader", runnableOne);
+            }
 
-		}).start();
+        }).start();
 
-		ExclusiveExecutor.Runnable runnableTwo = new ExclusiveExecutor.Runnable() {
+        ExclusiveExecutor.Runnable runnableTwo = new ExclusiveExecutor.Runnable() {
 
-			@Override
-			public void run() throws Exception {
-				System.out.println("this is thread two");
-			}
+            @Override
+            public void run() throws Exception {
+                System.out.println("this is thread two");
+            }
 
-		};
-		new Thread(new Runnable() {
+        };
+        new Thread(new Runnable() {
 
-			@Override
-			public void run() {
-				ExclusiveExecutor.execute("/alert/test/leader", runnableTwo);
-			}
+            @Override
+            public void run() {
+                ExclusiveExecutor.execute("/alert/test/leader", runnableTwo);
+            }
 
-		}).start();
+        }).start();
 
-		Thread.sleep(2000);
+        Thread.sleep(2000);
 
-		System.out.flush();
-		BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString()));
-		List<String> logs = new ArrayList<String>();
-		String line = null;
-		while ((line = br.readLine()) != null) {
-			logs.add(line);
-		}
+        System.out.flush();
+        BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString()));
+        List<String> logs = new ArrayList<String>();
+        String line = null;
+        while ((line = br.readLine()) != null) {
+            logs.add(line);
+        }
 
-		System.setOut(oldStream);
-		System.out.println("Cached logs: " + Joiner.on("\n").join(logs));
+        System.setOut(oldStream);
+        System.out.println("Cached logs: " + Joiner.on("\n").join(logs));
 
-		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one")));
-		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two")));
+        Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one")));
+        Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two")));
 
-		Assert.assertTrue(runnableOne.isCompleted());
-		Assert.assertTrue(runnableTwo.isCompleted());
-	}
+        Assert.assertTrue(runnableOne.isCompleted());
+        Assert.assertTrue(runnableTwo.isCompleted());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
index a86b13a..91df334 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
@@ -23,109 +23,113 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestGreedyScheduleCoordinator {
-	
-	public static class ScheduleZkState {
-    	volatile boolean scheduleAcquired = false;
+
+    public static class ScheduleZkState {
+        volatile boolean scheduleAcquired = false;
         volatile boolean scheduleCompleted = false;
     }
 
-	public static class GreedyScheduleCoordinator {
-		
-	    public int schedule(int input) {
-	    	ScheduleZkState scheduleZkState = new ScheduleZkState();
-	    	ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
-				@Override
-				public void run() throws Exception {
-					scheduleZkState.scheduleAcquired = true;
-					
-					while (!scheduleZkState.scheduleCompleted) {
-						Thread.sleep(2000);
-					}
-				}
-	    	};
-	    	ExclusiveExecutor.execute("/alert/test", exclusiveRunnable);
-	    	int waitMaxTimes = 0;
-	    	while (waitMaxTimes < 90) { //about 3 minutes waiting
-	    		if (!scheduleZkState.scheduleAcquired) {
-	    			waitMaxTimes ++;
-	    			try {
-						Thread.sleep(2000);
-					} catch (InterruptedException e) {}
-	    			continue;
-	    		}
-	    		try {
-	    			return input;
-	    		} finally {
-	    			//schedule completed
-	    			scheduleZkState.scheduleCompleted = true;
-	    		}
-	    	}
-	    	throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later");
-	    }
-		
-	}
-	
-	ZookeeperEmbedded zkEmbed;
-
-	@Before
-	public void setUp() throws Exception {
-		zkEmbed = new ZookeeperEmbedded(2181);
-		zkEmbed.start();
-
-		Thread.sleep(2000);
-	}
-
-	@After
-	public void tearDown() throws Exception {
-		zkEmbed.shutdown();
-	}
-	
-	@Test
-	public void testMain() throws Exception {
-		final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator();
-		
-		
-		new Thread(new Runnable() {
-
-			@Override
-			public void run() {
-				System.out.println("output: " + coordinator.schedule(1));
-				
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {}
-			}
-			
-		}).start();
-		
-		new Thread(new Runnable() {
-
-			@Override
-			public void run() {
-				System.out.println("output: " + coordinator.schedule(2));
-				
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {}
-			}
-			
-		}).start();
-		
-		new Thread(new Runnable() {
-
-			@Override
-			public void run() {
-				System.out.println("output: " + coordinator.schedule(3));
-				
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {}
-			}
-			
-		}).start();
-		
-		Thread.sleep(15000);
-	}
-	
-	
+    public static class GreedyScheduleCoordinator {
+
+        public int schedule(int input) {
+            ScheduleZkState scheduleZkState = new ScheduleZkState();
+            ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+                @Override
+                public void run() throws Exception {
+                    scheduleZkState.scheduleAcquired = true;
+
+                    while (!scheduleZkState.scheduleCompleted) {
+                        Thread.sleep(2000);
+                    }
+                }
+            };
+            ExclusiveExecutor.execute("/alert/test", exclusiveRunnable);
+            int waitMaxTimes = 0;
+            while (waitMaxTimes < 90) { //about 3 minutes waiting
+                if (!scheduleZkState.scheduleAcquired) {
+                    waitMaxTimes++;
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                    }
+                    continue;
+                }
+                try {
+                    return input;
+                } finally {
+                    //schedule completed
+                    scheduleZkState.scheduleCompleted = true;
+                }
+            }
+            throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later");
+        }
+
+    }
+
+    ZookeeperEmbedded zkEmbed;
+
+    @Before
+    public void setUp() throws Exception {
+        zkEmbed = new ZookeeperEmbedded(2181);
+        zkEmbed.start();
+
+        Thread.sleep(2000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        zkEmbed.shutdown();
+    }
+
+    @Test
+    public void testMain() throws Exception {
+        final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator();
+
+
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                System.out.println("output: " + coordinator.schedule(1));
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
+        }).start();
+
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                System.out.println("output: " + coordinator.schedule(2));
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
+        }).start();
+
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                System.out.println("output: " + coordinator.schedule(3));
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
+        }).start();
+
+        Thread.sleep(15000);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
index 52ea022..077e619 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
@@ -38,9 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
  * @since Apr 27, 2016
- *
  */
 public class WorkSlotStrategyTest {
 
@@ -54,7 +52,7 @@ public class WorkSlotStrategyTest {
         partition.setType(StreamPartition.Type.GLOBAL);
         partition.setStreamId("s1");
         partition.setColumns(Arrays.asList("f1", "f2"));
-        
+
         StreamGroup group = new StreamGroup();
         group.addStreamPartition(partition);
 
@@ -148,7 +146,7 @@ public class WorkSlotStrategyTest {
             Assert.assertEquals(5, queue.getWorkingSlots().size());
             Assert.assertEquals(2, context.getTopologies().size());
             Assert.assertEquals(2, context.getTopologyUsages().size());
-            
+
             String topo2 = queue.getWorkingSlots().get(0).getTopologyName();
             String bolt2 = queue.getWorkingSlots().get(0).getBoltId();
             for (WorkSlot ws : queue.getWorkingSlots()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
index f19533a..5024429 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
@@ -38,9 +38,8 @@ import org.apache.eagle.alert.service.IMetadataServiceClient;
 /**
  * According to metadata servic client semantic, change to the interface returned value should not direclty change the
  * states.
- * 
- * @since May 5, 2016
  *
+ * @since May 5, 2016
  */
 @SuppressWarnings("serial")
 public class InMemMetadataServiceClient implements IMetadataServiceClient {
@@ -73,7 +72,7 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient {
     public List<PolicyDefinition> listPolicies() {
         return Collections.unmodifiableList(policies);
     }
-    
+
     public void removePolicy(int idx) {
         policies.remove(idx);
     }



Mime
View raw message