eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [10/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
new file mode 100644
index 0000000..efe29bc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertEmailPublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
+    private AlertEmailGenerator emailGenerator;
+    private AlertDeduplicator deduplicator;
+    private Map<String, String> emailConfig;
+    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+    private transient ThreadPoolExecutor executorPool;
+    private PublishStatus status;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        LOG.info(" Creating Email Generator... ");
+        if (publishment.getProperties() != null) {
+            emailConfig = new HashMap<>(publishment.getProperties());
+            emailGenerator = createEmailGenerator(emailConfig);
+        }
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+    }
+
+    @Override
+    public void onAlert(AlertStreamEvent event) throws Exception {
+        if(emailGenerator == null) {
+            LOG.warn("emailGenerator is null due to the incorrect configurations");
+            return;
+        }
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        boolean isSuccess = emailGenerator.sendAlertEmail(event);
+        PublishStatus status = new PublishStatus();
+        if(!isSuccess) {
+            status.errorMessage = "Failed to send email";
+            status.successful = false;
+        } else {
+            status.errorMessage = "";
+            status.successful = true;
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
+            emailConfig = new HashMap<>(pluginProperties);
+            emailGenerator = createEmailGenerator(pluginProperties);
+        }
+    }
+
+    @Override
+    public void close() {
+        this.executorPool.shutdown();
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return deduplicator.dedup(event);
+    }
+
+    /**
+     * @param notificationConfig
+     * @return
+     */
+    private AlertEmailGenerator createEmailGenerator(Map<String, String> notificationConfig) {
+        String tplFileName = notificationConfig.get(PublishConstants.TEMPLATE);
+        if (tplFileName == null || tplFileName.equals("")) {
+            tplFileName = "ALERT_DEFAULT.vm";
+        }
+        String subject = notificationConfig.get(PublishConstants.SUBJECT);
+        if (subject == null) {
+            subject = "No subject";
+        }
+        String sender = notificationConfig.get(PublishConstants.SENDER);
+        String recipients = notificationConfig.get(PublishConstants.RECIPIENTS);
+        if(sender == null || recipients == null) {
+            LOG.warn("email sender or recipients is null");
+            return null;
+        }
+        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+                withMailProps(notificationConfig).
+                withSubject(subject).
+                withSender(sender).
+                withRecipients(recipients).
+                withTplFile(tplFileName).
+                withExecutorPool(this.executorPool).build();
+        return gen;
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEmailPublisher))
+            return false;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
new file mode 100644
index 0000000..ea65298
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertKafkaPublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
+    private AlertDeduplicator deduplicator;
+    private PublishStatus status;
+    @SuppressWarnings("rawtypes")
+    private KafkaProducer producer;
+    private String brokerList;
+    private String topic;
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+        if (publishment.getProperties() != null) {
+            Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties());
+            brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
+            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+            topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onAlert(AlertStreamEvent event) throws Exception {
+        if (producer == null) {
+            LOG.warn("KafkaProducer is null due to the incorrect configurations");
+            return;
+        }
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        PublishStatus status = new PublishStatus();
+        try {
+            Future<?> future = producer.send(createRecord(event, topic));
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            status.successful = true;
+            status.errorMessage = "";
+            LOG.info("Successfully send message to Kafka: " + brokerList);
+        } catch (InterruptedException | ExecutionException e) {
+            status.successful = false;
+            status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e);
+            LOG.error(status.errorMessage, e);
+        } catch (Exception ex ) {
+            LOG.error("fail writing alert to Kafka bus", ex);
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim();
+        String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim();
+        if (!newBrokerList.equals(this.brokerList)) {
+            producer.close();
+            brokerList = newBrokerList;
+            KafkaProducer newProducer = null;
+            try {
+                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+            } catch (Exception e) {
+                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
+            }
+            producer = newProducer;
+        }
+        topic = newTopic;
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * To Create  KafkaProducer Record
+     * @param event
+     * @return ProducerRecord
+     * @throws Exception
+     */
+    private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception {
+        ProducerRecord<String, String>  record  = new ProducerRecord<>(topic, event.toString());
+        return record;
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return this.deduplicator.dedup(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
new file mode 100644
index 0000000..f538088
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * @Since on 5/11/16.
+ */
+public class AlertPublishPluginsFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
+
+    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) {
+        AlertPublishPlugin plugin = null;
+        String publisherType = publishment.getType();
+        try {
+            plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
+            plugin.init(config, publishment);
+        } catch (Exception ex) {
+            LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
+            //throw new IllegalStateException(ex);
+        }
+        return plugin;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
new file mode 100644
index 0000000..fce22f1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertPublisherImpl implements AlertPublisher {
+    private static final long serialVersionUID = 4809983246198138865L;
+    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
+    private final String name;
+
+    private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
+    private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+    private Config config;
+
+    public AlertPublisherImpl(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void nextEvent(AlertStreamEvent event) {
+        if(LOG.isDebugEnabled())
+            LOG.debug(event.toString());
+        notifyAlert(event);
+    }
+
+    private void notifyAlert(AlertStreamEvent event) {
+        String policyId = event.getPolicyId();
+        if(policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
+            LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
+            return;
+        }
+        for(String id: policyPublishPluginMapping.get(policyId)) {
+            AlertPublishPlugin plugin = publishPluginMapping.get(id);
+            try {
+                if(LOG.isDebugEnabled()) LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
+                plugin.onAlert(event);
+            } catch (Exception ex) {
+                LOG.error("Fail invoking publisher's onAlert, continue ", ex);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        publishPluginMapping.values().forEach(plugin -> plugin.close());
+    }
+
+    @Override
+    public void onPublishChange(List<Publishment> added,
+                                List<Publishment> removed,
+                                List<Publishment> afterModified,
+                                List<Publishment> beforeModified) {
+        if (added == null) added = new ArrayList<>();
+        if (removed == null) removed = new ArrayList<>();
+        if (afterModified == null) afterModified = new ArrayList<>();
+        if (beforeModified == null) beforeModified = new ArrayList<>();
+
+        if (afterModified.size() != beforeModified.size()) {
+            LOG.warn("beforeModified size != afterModified size");
+            return;
+        }
+
+        for (Publishment publishment : added) {
+            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config);
+            if(plugin != null) {
+                publishPluginMapping.put(publishment.getName(), plugin);
+                onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
+            } else {
+                LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment);
+            }
+        }
+        for (Publishment publishment : removed) {
+            String pubName = publishment.getName();
+            onPolicyDeleted(publishment.getPolicyIds(), pubName);
+            publishPluginMapping.get(pubName).close();
+            publishPluginMapping.remove(publishment.getName());
+        }
+        for (int i = 0; i < afterModified.size(); i++) {
+            String pubName = afterModified.get(i).getName();
+            List<String> newPolicies = afterModified.get(i).getPolicyIds();
+            List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
+
+            if (! newPolicies.equals(oldPolicies)) {
+                List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
+                onPolicyDeleted(deletedPolicies, pubName);
+                List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
+                onPolicyAdded(addedPolicies, pubName);
+            }
+            Publishment newPub = afterModified.get(i);
+            publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
+        }
+    }
+
+    private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
+        if (addedPolicyIds == null || pubName == null) return;
+
+        for (String policyId : addedPolicyIds) {
+            if (policyPublishPluginMapping.get(policyId) == null) {
+                policyPublishPluginMapping.put(policyId, new ArrayList<>());
+            }
+            List<String> publishIds = policyPublishPluginMapping.get(policyId);
+            publishIds.add(pubName);
+        }
+    }
+
+    private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
+        if (deletedPolicyIds == null || pubName == null) return;
+
+        for (String policyId : deletedPolicyIds) {
+            List<String> publishIds = policyPublishPluginMapping.get(policyId);
+            publishIds.remove(pubName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
new file mode 100644
index 0000000..054d679
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultDeduplicator implements AlertDeduplicator {
+	private long dedupIntervalMin;
+	private volatile Map<EventUniq, Long> events = new HashMap<>();
+	private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+	public enum AlertDeduplicationStatus{
+		NEW,
+		DUPLICATED,
+		IGNORED
+	}
+	
+	public DefaultDeduplicator() {
+		this.dedupIntervalMin = 0;
+	}
+
+	public DefaultDeduplicator(String intervalMin) {
+		setDedupIntervalMin(intervalMin);
+	}
+	
+	public DefaultDeduplicator(long intervalMin) {
+		this.dedupIntervalMin = intervalMin;
+	}
+	
+	public void clearOldCache() {
+		List<EventUniq> removedkeys = new ArrayList<>();
+		for (Entry<EventUniq, Long> entry : events.entrySet()) {
+			EventUniq entity = entry.getKey();
+			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
+				removedkeys.add(entry.getKey());
+			}
+		}
+		for (EventUniq alertKey : removedkeys) {
+			events.remove(alertKey);
+		}
+	}
+	
+	public AlertDeduplicationStatus checkDedup(EventUniq key) {
+		long current = key.timestamp;
+		if(!events.containsKey(key)) {
+			events.put(key, current);
+			return AlertDeduplicationStatus.NEW;
+		}
+		
+		long last = events.get(key);
+		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
+			events.put(key, current);
+			return AlertDeduplicationStatus.IGNORED;
+		}
+		
+		return AlertDeduplicationStatus.DUPLICATED;
+	}
+	
+	public AlertStreamEvent dedup(AlertStreamEvent event) {
+        if (event == null) return null;
+		clearOldCache();
+		AlertStreamEvent result = null;
+		AlertDeduplicationStatus status = checkDedup(new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime()));
+		if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
+			result = event;
+		} else if(LOG.isDebugEnabled()){
+			LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
+		}
+		return result;
+	}
+
+	@Override
+	public void setDedupIntervalMin(String newDedupIntervalMin) {
+		if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
+			dedupIntervalMin = 0;
+            return;
+		}
+		try {
+			Period period = Period.parse(newDedupIntervalMin);
+			this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
+		} catch (Exception e) {
+			LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
+			this.dedupIntervalMin = 0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
new file mode 100644
index 0000000..df472d0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * @since Mar 19, 2015
+ */
+public class EventUniq {
+	public String streamId;
+	public String policyId;
+	public Long timestamp;	 // event's createTimestamp
+	public long createdTime; // created time, for cache removal;
+
+	public EventUniq(String streamId, String policyId, long timestamp) {
+		this.streamId = streamId;
+		this.timestamp = timestamp;
+		this.policyId = policyId;
+		this.createdTime = System.currentTimeMillis();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EventUniq) {
+			EventUniq au = (EventUniq) obj;
+			return (this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId));
+		}
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		return new HashCodeBuilder()
+				.append(streamId)
+				.append(policyId)
+				.build();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
new file mode 100644
index 0000000..e8964a8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
+ */
+public class KafkaProducerManager {
+	public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
+
+	public KafkaProducer<String, Object> getProducer(String brokerList) {
+		Properties configMap = new Properties();
+		configMap.put("bootstrap.servers", brokerList);
+		configMap.put("metadata.broker.list", brokerList);
+		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("request.required.acks", "1");	     
+		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+		return producer;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
new file mode 100644
index 0000000..c165686
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+/**
+ * Object that holds the status of Notification Posted to Notification Plugin  
+ */
+public class PublishStatus {
+	public boolean successful;
+	public String errorMessage;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
new file mode 100644
index 0000000..e1f3e9c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/2/16.
+ */
+public interface AlertBoltSpecListener {
+    void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
new file mode 100644
index 0000000..aa40dc5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/3/16.
+ */
+public interface SpoutSpecListener {
+    void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
new file mode 100644
index 0000000..07518d9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
@@ -0,0 +1,85 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * <b></b>
+ * 1. Group by SingleStream[stream_1.col1]
+ *
+ * Shuffle(stream_1,[col1])
+ *
+ * <b></b>
+ * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
+ *
+ * Shuffle(stream_1,[col1,col2])
+ *
+ * <b></b>
+ * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
+ *
+ * Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)
+ */
+public class StreamRoute implements Serializable{
+    private static final long serialVersionUID = 4649184902196034940L;
+
+    private String targetComponentId;
+    private int partitionKey;
+    private String partitionType;
+
+    public String getTargetComponentId() {
+        return targetComponentId;
+    }
+
+    public void setTargetComponentId(String targetComponentId) {
+        this.targetComponentId = targetComponentId;
+    }
+
+    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type){
+        this.setTargetComponentId(targetComponentId);
+        this.setPartitionKey(partitionKey);
+        this.setPartitionType(type);
+    }
+
+    public int getPartitionKey() {
+        return partitionKey;
+    }
+
+    public void setPartitionKey(int partitionKey) {
+        this.partitionKey = partitionKey;
+    }
+
+    public StreamPartition.Type getPartitionType() {
+        return StreamPartition.Type.valueOf(partitionType);
+    }
+
+    public void setPartitionType(StreamPartition.Type partitionType) {
+        this.partitionType = partitionType.name();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
new file mode 100644
index 0000000..6dcd312
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
+
+public class StreamRoutePartitionFactory {
+    /**
+     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner
+     *
+     * @param outputComponentIds
+     * @param streamDefinition
+     * @param partition
+     * @return
+     */
+    public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
+        return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
new file mode 100644
index 0000000..213abef
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+public interface StreamRoutePartitioner {
+    /**
+     * @param event
+     * @return
+     */
+    List<StreamRoute> partition(StreamEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
new file mode 100644
index 0000000..abc465d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
@@ -0,0 +1,30 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamRouteSpecListener {
+    void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
+                                  Collection<StreamRouterSpec> removed,
+                                  Collection<StreamRouterSpec> modified,
+                                  Map<String, StreamDefinition> sds);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
new file mode 100644
index 0000000..da7fb7e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
@@ -0,0 +1,30 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamRouter extends StreamSortSpecListener, Serializable {
+    void prepare(StreamContext context, PartitionedEventCollector outputCollector);
+    void nextEvent(PartitionedEvent event);
+    String getName();
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
new file mode 100644
index 0000000..cd4bfd3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/1/16.
+ * Listen to change on StreamRouterBoltSpec
+ */
+public interface StreamRouterBoltSpecListener {
+    void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
new file mode 100644
index 0000000..2229099
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
+
+public interface StreamSortHandler extends StreamTimeClockListener {
+    /**
+     *
+     * @param streamId
+     * @param streamSortSpecSpec
+     * @param outputCollector
+     */
+    void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
+
+    /**
+     * @param event StreamEvent
+     */
+    void nextEvent(PartitionedEvent event);
+
+    /**
+     *
+     */
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
new file mode 100644
index 0000000..affa979
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
@@ -0,0 +1,28 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamSortSpecListener {
+    void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
+                                Map<StreamPartition, StreamSortSpec> removed,
+                                Map<StreamPartition, StreamSortSpec> changed);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
new file mode 100644
index 0000000..afb9a6f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.StreamRoute;
+import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
+
+public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
+    private final List<String> outputComponentIds;
+    private final StreamDefinition streamDefinition;
+    private final StreamPartition streamPartition;
+
+    public BasicStreamRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
+        this.outputComponentIds = outputComponentIds;
+        this.streamDefinition = streamDefinition;
+        this.streamPartition = partition;
+    }
+
+    @Override
+    public List<StreamRoute> partition(StreamEvent event) {
+        switch (this.streamPartition.getType()){
+            case GLOBAL:
+                return routeToAll(event);
+            case GROUPBY:
+                return routeByGroupByKey(event);
+            default:
+                return routeByShuffle(event);
+        }
+    }
+
+    protected List<StreamRoute> routeByGroupByKey(StreamEvent event) {
+        int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition,this.streamPartition.getColumns())).build();
+        String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size());
+        return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY));
+    }
+
+    protected List<StreamRoute> routeByShuffle(StreamEvent event) {
+        long random = System.currentTimeMillis();
+        int hash = Math.abs((int)random);
+        return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()),-1,StreamPartition.Type.SHUFFLE));
+    }
+
+    protected List<StreamRoute> routeToAll(StreamEvent event) {
+        if(_globalRoutingKeys!=null) {
+            _globalRoutingKeys = new ArrayList<>();
+            for (String targetId : outputComponentIds) {
+                _globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL));
+            }
+        }
+        return _globalRoutingKeys;
+    }
+
+    private List<StreamRoute> _globalRoutingKeys = null;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
new file mode 100644
index 0000000..d0bf012
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.trident.partition.GlobalGrouping;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+public class RoutePhysicalGrouping implements CustomStreamGrouping {
+    private static final long serialVersionUID = -511915083994148362L;
+    private final static Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
+    private List<Integer> outdegreeTasks;
+    private ShuffleGrouping shuffleGroupingDelegate;
+    private GlobalGrouping globalGroupingDelegate;
+    private Map<String,Integer> connectedTargetIds;
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.outdegreeTasks = new ArrayList<>(targetTasks);
+        shuffleGroupingDelegate = new ShuffleGrouping();
+        shuffleGroupingDelegate.prepare(context,stream,targetTasks);
+        globalGroupingDelegate = new GlobalGrouping();
+        globalGroupingDelegate.prepare(context,stream,targetTasks);
+        connectedTargetIds = new HashMap<>();
+        for(Integer targetId:targetTasks){
+            String targetComponentId = context.getComponentId(targetId);
+            connectedTargetIds.put(targetComponentId,targetId);
+        }
+        LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(),","));
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        Object routingKeyObj = values.get(0);
+        if(routingKeyObj!=null){
+            PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj;
+            if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) {
+                return globalGroupingDelegate.chooseTasks(taskId,values);
+            } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) {
+                return Collections.singletonList(outdegreeTasks.get((int)(partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
+            }
+            // Shuffle by defaults
+            return shuffleGroupingDelegate.chooseTasks(taskId, values);
+        }
+
+        LOG.warn("Illegal null StreamRoute, throw event");
+        return Collections.emptyList();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
new file mode 100644
index 0000000..c73854a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+/**
+ * NOTE: This is copy from storm 1.0.0 code. DON'T modify it.
+ * 
+ * @since May 4, 2016
+ *
+ */
+public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
+    private static final long serialVersionUID = 5035497345182141765L;
+    private Random random;
+    private ArrayList<List<Integer>> choices;
+    private AtomicInteger current;
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        random = new Random();
+        choices = new ArrayList<List<Integer>>(targetTasks.size());
+        for (Integer i: targetTasks) {
+            choices.add(Arrays.asList(i));
+        }
+        Collections.shuffle(choices, random);
+        current = new AtomicInteger(0);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int rightNow;
+        int size = choices.size();
+        while (true) {
+            rightNow = current.incrementAndGet();
+            if (rightNow < size) {
+                return choices.get(rightNow);
+            } else if (rightNow == size) {
+                current.set(0);
+                //This should be thread safe so long as ArrayList does not have any internal state that can be messed up by multi-treaded access.
+                Collections.shuffle(choices, random);
+                return choices.get(0);
+            }
+            //race condition with another thread, and we lost
+            // try again
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
new file mode 100644
index 0000000..db235a7
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -0,0 +1,229 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.router.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.StreamRoute;
+import org.apache.eagle.alert.engine.router.StreamRoutePartitionFactory;
+import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
+import org.apache.eagle.alert.engine.router.StreamRouteSpecListener;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * After sorting, one stream's message will be routed based on its StreamPartition
+ * One stream may have multiple StreamPartitions based on how this stream is grouped by
+ *
+ * TODO: Add metric statistics
+ */
+public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
+    private final OutputCollector outputCollector;
+    private final Object outputLock = new Object();
+    private final List<String> outputStreamIds;
+    private final StreamContext streamContext;
+    private final PartitionedEventSerializer serializer;
+    private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap;
+    private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
+    private final String sourceId;
+
+    public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer){
+        this.sourceId = sourceId;
+        this.outputCollector = outputCollector;
+        this.routeSpecMap = new HashMap<>();
+        this.routePartitionerMap = new HashMap<>();
+        this.outputStreamIds = outputStreamIds;
+        this.streamContext = streamContext;
+        this.serializer = serializer;
+    }
+
+    public void emit(PartitionedEvent event) {
+        try {
+            this.streamContext.counter().scope("send_count").incr();
+            StreamPartition partition = event.getPartition();
+            StreamRouterSpec routerSpec = routeSpecMap.get(partition);
+            if (routerSpec == null) {
+                if (LOG.isDebugEnabled()) {
+                    // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
+                    LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
+                }
+                this.drop(event);
+                return;
+            }
+
+            if (routePartitionerMap.get(routerSpec.getPartition()) == null) {
+                LOG.error("Partitioner for " + routerSpec + " is null");
+                synchronized (outputLock) {
+                    this.streamContext.counter().scope("fail_count").incr();
+                    this.outputCollector.fail(event.getAnchor());
+                }
+                return;
+            }
+
+            StreamEvent newEvent = event.getEvent().copy();
+
+            // Get handler for the partition
+            List<StreamRoutePartitioner> queuePartitioners = routePartitionerMap.get(partition);
+
+            synchronized (outputLock) {
+                for (StreamRoutePartitioner queuePartitioner : queuePartitioners) {
+                    List<StreamRoute> streamRoutes = queuePartitioner.partition(newEvent);
+                    // it is possible that one event can be sent to multiple slots in one slotqueue if that is All grouping
+                    for (StreamRoute streamRoute : streamRoutes) {
+                        String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
+                        try {
+                            PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, routerSpec.getPartition(), streamRoute.getPartitionKey());
+                            // Route Target Stream id instead of component id
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
+                            }
+                            if(this.serializer == null) {
+                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent));
+                            } else {
+                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
+                            }
+                            this.streamContext.counter().scope("emit_count").incr();
+                        } catch (RuntimeException ex) {
+                            this.streamContext.counter().scope("fail_count").incr();
+                            LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
+                            throw ex;
+                        }
+                    }
+                }
+                outputCollector.ack(event.getAnchor());
+            }
+        } catch (Exception ex){
+            LOG.error(ex.getMessage(),ex);
+            synchronized (outputLock) {
+                this.streamContext.counter().scope("fail_count").incr();
+                this.outputCollector.fail(event.getAnchor());
+            }
+        }
+    }
+
+    @Override
+    public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
+                                         Collection<StreamRouterSpec> removed,
+                                         Collection<StreamRouterSpec> modified,
+                                         Map<String, StreamDefinition> sds){
+        Map<StreamPartition,StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
+        Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
+
+        // added StreamRouterSpec i.e. there is a new StreamPartition
+        for(StreamRouterSpec spec : added){
+            if(copyRouteSpecMap.containsKey(spec.getPartition())){
+                LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
+            }else{
+                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
+            }
+        }
+
+        // removed StreamRouterSpec i.e. there is a deleted StreamPartition
+        for(StreamRouterSpec spec : removed){
+            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
+                LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
+            }else{
+                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
+            }
+        }
+
+        // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
+        for(StreamRouterSpec spec : modified){
+            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
+                LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
+            }else{
+                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
+                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
+            }
+        }
+
+        // switch
+        routeSpecMap = copyRouteSpecMap;
+        routePartitionerMap = copyRoutePartitionerMap;
+    }
+
+    private void inplaceRemove(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
+                               Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
+                               StreamRouterSpec toBeRemoved){
+        routeSpecMap.remove(toBeRemoved.getPartition());
+        routePartitionerMap.remove(toBeRemoved.getPartition());
+    }
+
+    private void inplaceAdd(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
+                            Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
+                            StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds){
+        routeSpecMap.put(toBeAdded.getPartition(), toBeAdded);
+        try {
+            List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds);
+            routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
+        }catch(Exception e){
+            LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(),e);
+            routeSpecMap.remove(toBeAdded.getPartition());
+            routePartitionerMap.remove(toBeAdded.getPartition());
+        }
+    }
+
+    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception{
+        List<StreamRoutePartitioner> routePartitioners = new ArrayList<>();
+        for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
+            routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
+                    Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
+                    sds.get(streamRouterSpec.getPartition().getStreamId()),
+                    streamRouterSpec.getPartition()));
+        }
+        return routePartitioners;
+    }
+
+    @Override
+    public void drop(PartitionedEvent event) {
+        synchronized (outputLock) {
+            this.streamContext.counter().scope("drop_count").incr();
+            if (event.getAnchor() != null) {
+                this.outputCollector.ack(event.getAnchor());
+            }else{
+                throw new IllegalStateException(event.toString()+" was not acked as anchor is null");
+            }
+        }
+    }
+
+    public void flush() {
+
+    }
+}
\ No newline at end of file


Mime
View raw message