eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [15/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:07:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
deleted file mode 100644
index efe29bc..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class AlertEmailPublisher implements AlertPublishPlugin {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
-    private AlertEmailGenerator emailGenerator;
-    private AlertDeduplicator deduplicator;
-    private Map<String, String> emailConfig;
-    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-    private transient ThreadPoolExecutor executorPool;
-    private PublishStatus status;
-
-    @Override
-    public void init(Config config, Publishment publishment) throws Exception {
-        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-        LOG.info(" Creating Email Generator... ");
-        if (publishment.getProperties() != null) {
-            emailConfig = new HashMap<>(publishment.getProperties());
-            emailGenerator = createEmailGenerator(emailConfig);
-        }
-        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        if(emailGenerator == null) {
-            LOG.warn("emailGenerator is null due to the incorrect configurations");
-            return;
-        }
-        event = dedup(event);
-        if(event == null) {
-            return;
-        }
-        boolean isSuccess = emailGenerator.sendAlertEmail(event);
-        PublishStatus status = new PublishStatus();
-        if(!isSuccess) {
-            status.errorMessage = "Failed to send email";
-            status.successful = false;
-        } else {
-            status.errorMessage = "";
-            status.successful = true;
-        }
-        this.status = status;
-    }
-
-    @Override
-    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-        if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
-            emailConfig = new HashMap<>(pluginProperties);
-            emailGenerator = createEmailGenerator(pluginProperties);
-        }
-    }
-
-    @Override
-    public void close() {
-        this.executorPool.shutdown();
-    }
-
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
-    }
-
-    @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return deduplicator.dedup(event);
-    }
-
-    /**
-     * @param notificationConfig
-     * @return
-     */
-    private AlertEmailGenerator createEmailGenerator(Map<String, String> notificationConfig) {
-        String tplFileName = notificationConfig.get(PublishConstants.TEMPLATE);
-        if (tplFileName == null || tplFileName.equals("")) {
-            tplFileName = "ALERT_DEFAULT.vm";
-        }
-        String subject = notificationConfig.get(PublishConstants.SUBJECT);
-        if (subject == null) {
-            subject = "No subject";
-        }
-        String sender = notificationConfig.get(PublishConstants.SENDER);
-        String recipients = notificationConfig.get(PublishConstants.RECIPIENTS);
-        if(sender == null || recipients == null) {
-            LOG.warn("email sender or recipients is null");
-            return null;
-        }
-        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
-                withMailProps(notificationConfig).
-                withSubject(subject).
-                withSender(sender).
-                withRecipients(recipients).
-                withTplFile(tplFileName).
-                withExecutorPool(this.executorPool).build();
-        return gen;
-    }
-
-    @Override
-    public int hashCode(){
-        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o){
-        if(o == this)
-            return true;
-        if(!(o instanceof AlertEmailPublisher))
-            return false;
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
deleted file mode 100644
index ea65298..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class AlertKafkaPublisher implements AlertPublishPlugin {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
-    private AlertDeduplicator deduplicator;
-    private PublishStatus status;
-    @SuppressWarnings("rawtypes")
-    private KafkaProducer producer;
-    private String brokerList;
-    private String topic;
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    @Override
-    public void init(Config config, Publishment publishment) throws Exception {
-        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
-        if (publishment.getProperties() != null) {
-            Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties());
-            brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
-            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
-            topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        if (producer == null) {
-            LOG.warn("KafkaProducer is null due to the incorrect configurations");
-            return;
-        }
-        event = dedup(event);
-        if(event == null) {
-            return;
-        }
-        PublishStatus status = new PublishStatus();
-        try {
-            Future<?> future = producer.send(createRecord(event, topic));
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            status.successful = true;
-            status.errorMessage = "";
-            LOG.info("Successfully send message to Kafka: " + brokerList);
-        } catch (InterruptedException | ExecutionException e) {
-            status.successful = false;
-            status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e);
-            LOG.error(status.errorMessage, e);
-        } catch (Exception ex ) {
-            LOG.error("fail writing alert to Kafka bus", ex);
-            status.successful = false;
-            status.errorMessage = ex.getMessage();
-        }
-        this.status = status;
-    }
-
-    @Override
-    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-        String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim();
-        String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim();
-        if (!newBrokerList.equals(this.brokerList)) {
-            producer.close();
-            brokerList = newBrokerList;
-            KafkaProducer newProducer = null;
-            try {
-                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
-            } catch (Exception e) {
-                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
-            }
-            producer = newProducer;
-        }
-        topic = newTopic;
-    }
-
-    @Override
-    public void close() {
-        producer.close();
-    }
-
-    /**
-     * To Create  KafkaProducer Record
-     * @param event
-     * @return ProducerRecord
-     * @throws Exception
-     */
-    private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception {
-        ProducerRecord<String, String>  record  = new ProducerRecord<>(topic, event.toString());
-        return record;
-    }
-
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
-    }
-
-    @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return this.deduplicator.dedup(event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
deleted file mode 100644
index f538088..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * @Since on 5/11/16.
- */
-public class AlertPublishPluginsFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
-
-    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) {
-        AlertPublishPlugin plugin = null;
-        String publisherType = publishment.getType();
-        try {
-            plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
-            plugin.init(config, publishment);
-        } catch (Exception ex) {
-            LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
-            //throw new IllegalStateException(ex);
-        }
-        return plugin;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
deleted file mode 100644
index fce22f1..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.collections.ListUtils;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class AlertPublisherImpl implements AlertPublisher {
-    private static final long serialVersionUID = 4809983246198138865L;
-    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
-    private final String name;
-
-    private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
-    private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
-    private Config config;
-
-    public AlertPublisherImpl(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void init(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void nextEvent(AlertStreamEvent event) {
-        if(LOG.isDebugEnabled())
-            LOG.debug(event.toString());
-        notifyAlert(event);
-    }
-
-    private void notifyAlert(AlertStreamEvent event) {
-        String policyId = event.getPolicyId();
-        if(policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
-            LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
-            return;
-        }
-        for(String id: policyPublishPluginMapping.get(policyId)) {
-            AlertPublishPlugin plugin = publishPluginMapping.get(id);
-            try {
-                if(LOG.isDebugEnabled()) LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
-                plugin.onAlert(event);
-            } catch (Exception ex) {
-                LOG.error("Fail invoking publisher's onAlert, continue ", ex);
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        publishPluginMapping.values().forEach(plugin -> plugin.close());
-    }
-
-    @Override
-    public void onPublishChange(List<Publishment> added,
-                                List<Publishment> removed,
-                                List<Publishment> afterModified,
-                                List<Publishment> beforeModified) {
-        if (added == null) added = new ArrayList<>();
-        if (removed == null) removed = new ArrayList<>();
-        if (afterModified == null) afterModified = new ArrayList<>();
-        if (beforeModified == null) beforeModified = new ArrayList<>();
-
-        if (afterModified.size() != beforeModified.size()) {
-            LOG.warn("beforeModified size != afterModified size");
-            return;
-        }
-
-        for (Publishment publishment : added) {
-            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config);
-            if(plugin != null) {
-                publishPluginMapping.put(publishment.getName(), plugin);
-                onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
-            } else {
-                LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment);
-            }
-        }
-        for (Publishment publishment : removed) {
-            String pubName = publishment.getName();
-            onPolicyDeleted(publishment.getPolicyIds(), pubName);
-            publishPluginMapping.get(pubName).close();
-            publishPluginMapping.remove(publishment.getName());
-        }
-        for (int i = 0; i < afterModified.size(); i++) {
-            String pubName = afterModified.get(i).getName();
-            List<String> newPolicies = afterModified.get(i).getPolicyIds();
-            List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
-
-            if (! newPolicies.equals(oldPolicies)) {
-                List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
-                onPolicyDeleted(deletedPolicies, pubName);
-                List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
-                onPolicyAdded(addedPolicies, pubName);
-            }
-            Publishment newPub = afterModified.get(i);
-            publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
-        }
-    }
-
-    private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
-        if (addedPolicyIds == null || pubName == null) return;
-
-        for (String policyId : addedPolicyIds) {
-            if (policyPublishPluginMapping.get(policyId) == null) {
-                policyPublishPluginMapping.put(policyId, new ArrayList<>());
-            }
-            List<String> publishIds = policyPublishPluginMapping.get(policyId);
-            publishIds.add(pubName);
-        }
-    }
-
-    private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
-        if (deletedPolicyIds == null || pubName == null) return;
-
-        for (String policyId : deletedPolicyIds) {
-            List<String> publishIds = policyPublishPluginMapping.get(policyId);
-            publishIds.remove(pubName);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
deleted file mode 100644
index 054d679..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DefaultDeduplicator implements AlertDeduplicator {
-	private long dedupIntervalMin;
-	private volatile Map<EventUniq, Long> events = new HashMap<>();
-	private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
-	public enum AlertDeduplicationStatus{
-		NEW,
-		DUPLICATED,
-		IGNORED
-	}
-	
-	public DefaultDeduplicator() {
-		this.dedupIntervalMin = 0;
-	}
-
-	public DefaultDeduplicator(String intervalMin) {
-		setDedupIntervalMin(intervalMin);
-	}
-	
-	public DefaultDeduplicator(long intervalMin) {
-		this.dedupIntervalMin = intervalMin;
-	}
-	
-	public void clearOldCache() {
-		List<EventUniq> removedkeys = new ArrayList<>();
-		for (Entry<EventUniq, Long> entry : events.entrySet()) {
-			EventUniq entity = entry.getKey();
-			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-				removedkeys.add(entry.getKey());
-			}
-		}
-		for (EventUniq alertKey : removedkeys) {
-			events.remove(alertKey);
-		}
-	}
-	
-	public AlertDeduplicationStatus checkDedup(EventUniq key) {
-		long current = key.timestamp;
-		if(!events.containsKey(key)) {
-			events.put(key, current);
-			return AlertDeduplicationStatus.NEW;
-		}
-		
-		long last = events.get(key);
-		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
-			events.put(key, current);
-			return AlertDeduplicationStatus.IGNORED;
-		}
-		
-		return AlertDeduplicationStatus.DUPLICATED;
-	}
-	
-	public AlertStreamEvent dedup(AlertStreamEvent event) {
-        if (event == null) return null;
-		clearOldCache();
-		AlertStreamEvent result = null;
-		AlertDeduplicationStatus status = checkDedup(new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime()));
-		if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
-			result = event;
-		} else if(LOG.isDebugEnabled()){
-			LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
-		}
-		return result;
-	}
-
-	@Override
-	public void setDedupIntervalMin(String newDedupIntervalMin) {
-		if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
-			dedupIntervalMin = 0;
-            return;
-		}
-		try {
-			Period period = Period.parse(newDedupIntervalMin);
-			this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
-		} catch (Exception e) {
-			LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
-			this.dedupIntervalMin = 0;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
deleted file mode 100644
index df472d0..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-/**
- * 
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * @since Mar 19, 2015
- */
-public class EventUniq {
-	public String streamId;
-	public String policyId;
-	public Long timestamp;	 // event's createTimestamp
-	public long createdTime; // created time, for cache removal;
-
-	public EventUniq(String streamId, String policyId, long timestamp) {
-		this.streamId = streamId;
-		this.timestamp = timestamp;
-		this.policyId = policyId;
-		this.createdTime = System.currentTimeMillis();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof EventUniq) {
-			EventUniq au = (EventUniq) obj;
-			return (this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId));
-		}
-		return false;
-	}
-
-	@Override
-	public int hashCode() {
-		return new HashCodeBuilder()
-				.append(streamId)
-				.append(policyId)
-				.build();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
deleted file mode 100644
index e8964a8..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-
-/**
- * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
- */
-public class KafkaProducerManager {
-	public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
-
-	public KafkaProducer<String, Object> getProducer(String brokerList) {
-		Properties configMap = new Properties();
-		configMap.put("bootstrap.servers", brokerList);
-		configMap.put("metadata.broker.list", brokerList);
-		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		configMap.put("request.required.acks", "1");	     
-		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
-		return producer;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
deleted file mode 100644
index c165686..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-/**
- * Object that holds the status of Notification Posted to Notification Plugin  
- */
-public class PublishStatus {
-	public boolean successful;
-	public String errorMessage;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
deleted file mode 100644
index e1f3e9c..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Since 5/2/16.
- */
-public interface AlertBoltSpecListener {
-    void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
deleted file mode 100644
index aa40dc5..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Since 5/3/16.
- */
-public interface SpoutSpecListener {
-    void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
deleted file mode 100644
index 07518d9..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * <b></b>
- * 1. Group by SingleStream[stream_1.col1]
- *
- * Shuffle(stream_1,[col1])
- *
- * <b></b>
- * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
- *
- * Shuffle(stream_1,[col1,col2])
- *
- * <b></b>
- * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
- *
- * Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)
- */
-public class StreamRoute implements Serializable{
-    private static final long serialVersionUID = 4649184902196034940L;
-
-    private String targetComponentId;
-    private int partitionKey;
-    private String partitionType;
-
-    public String getTargetComponentId() {
-        return targetComponentId;
-    }
-
-    public void setTargetComponentId(String targetComponentId) {
-        this.targetComponentId = targetComponentId;
-    }
-
-    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type){
-        this.setTargetComponentId(targetComponentId);
-        this.setPartitionKey(partitionKey);
-        this.setPartitionType(type);
-    }
-
-    public int getPartitionKey() {
-        return partitionKey;
-    }
-
-    public void setPartitionKey(int partitionKey) {
-        this.partitionKey = partitionKey;
-    }
-
-    public StreamPartition.Type getPartitionType() {
-        return StreamPartition.Type.valueOf(partitionType);
-    }
-
-    public void setPartitionType(StreamPartition.Type partitionType) {
-        this.partitionType = partitionType.name();
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
deleted file mode 100644
index 6dcd312..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
-
-public class StreamRoutePartitionFactory {
-    /**
-     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner
-     *
-     * @param outputComponentIds
-     * @param streamDefinition
-     * @param partition
-     * @return
-     */
-    public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
-        return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
deleted file mode 100644
index 213abef..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-public interface StreamRoutePartitioner {
-    /**
-     * @param event
-     * @return
-     */
-    List<StreamRoute> partition(StreamEvent event);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
deleted file mode 100644
index abc465d..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface StreamRouteSpecListener {
-    void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
-                                  Collection<StreamRouterSpec> removed,
-                                  Collection<StreamRouterSpec> modified,
-                                  Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
deleted file mode 100644
index da7fb7e..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.io.Serializable;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface StreamRouter extends StreamSortSpecListener, Serializable {
-    void prepare(StreamContext context, PartitionedEventCollector outputCollector);
-    void nextEvent(PartitionedEvent event);
-    String getName();
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
deleted file mode 100644
index cd4bfd3..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Since 5/1/16.
- * Listen to change on StreamRouterBoltSpec
- */
-public interface StreamRouterBoltSpecListener {
-    void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
deleted file mode 100644
index 2229099..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-
-public interface StreamSortHandler extends StreamTimeClockListener {
-    /**
-     *
-     * @param streamId
-     * @param streamSortSpecSpec
-     * @param outputCollector
-     */
-    void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
-
-    /**
-     * @param event StreamEvent
-     */
-    void nextEvent(PartitionedEvent event);
-
-    /**
-     *
-     */
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
deleted file mode 100644
index affa979..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface StreamSortSpecListener {
-    void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
-                                Map<StreamPartition, StreamSortSpec> removed,
-                                Map<StreamPartition, StreamSortSpec> changed);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
deleted file mode 100644
index afb9a6f..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-
-public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
-    private final List<String> outputComponentIds;
-    private final StreamDefinition streamDefinition;
-    private final StreamPartition streamPartition;
-
-    public BasicStreamRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
-        this.outputComponentIds = outputComponentIds;
-        this.streamDefinition = streamDefinition;
-        this.streamPartition = partition;
-    }
-
-    @Override
-    public List<StreamRoute> partition(StreamEvent event) {
-        switch (this.streamPartition.getType()){
-            case GLOBAL:
-                return routeToAll(event);
-            case GROUPBY:
-                return routeByGroupByKey(event);
-            default:
-                return routeByShuffle(event);
-        }
-    }
-
-    protected List<StreamRoute> routeByGroupByKey(StreamEvent event) {
-        int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition,this.streamPartition.getColumns())).build();
-        String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size());
-        return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY));
-    }
-
-    protected List<StreamRoute> routeByShuffle(StreamEvent event) {
-        long random = System.currentTimeMillis();
-        int hash = Math.abs((int)random);
-        return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()),-1,StreamPartition.Type.SHUFFLE));
-    }
-
-    protected List<StreamRoute> routeToAll(StreamEvent event) {
-        if(_globalRoutingKeys!=null) {
-            _globalRoutingKeys = new ArrayList<>();
-            for (String targetId : outputComponentIds) {
-                _globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL));
-            }
-        }
-        return _globalRoutingKeys;
-    }
-
-    private List<StreamRoute> _globalRoutingKeys = null;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
deleted file mode 100644
index d0bf012..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.trident.partition.GlobalGrouping;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-public class RoutePhysicalGrouping implements CustomStreamGrouping {
-    private static final long serialVersionUID = -511915083994148362L;
-    private final static Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
-    private List<Integer> outdegreeTasks;
-    private ShuffleGrouping shuffleGroupingDelegate;
-    private GlobalGrouping globalGroupingDelegate;
-    private Map<String,Integer> connectedTargetIds;
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        this.outdegreeTasks = new ArrayList<>(targetTasks);
-        shuffleGroupingDelegate = new ShuffleGrouping();
-        shuffleGroupingDelegate.prepare(context,stream,targetTasks);
-        globalGroupingDelegate = new GlobalGrouping();
-        globalGroupingDelegate.prepare(context,stream,targetTasks);
-        connectedTargetIds = new HashMap<>();
-        for(Integer targetId:targetTasks){
-            String targetComponentId = context.getComponentId(targetId);
-            connectedTargetIds.put(targetComponentId,targetId);
-        }
-        LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(),","));
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        Object routingKeyObj = values.get(0);
-        if(routingKeyObj!=null){
-            PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj;
-            if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) {
-                return globalGroupingDelegate.chooseTasks(taskId,values);
-            } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) {
-                return Collections.singletonList(outdegreeTasks.get((int)(partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
-            }
-            // Shuffle by defaults
-            return shuffleGroupingDelegate.chooseTasks(taskId, values);
-        }
-
-        LOG.warn("Illegal null StreamRoute, throw event");
-        return Collections.emptyList();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
deleted file mode 100644
index c73854a..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-/**
- * NOTE: This is copy from storm 1.0.0 code. DON'T modify it.
- * 
- * @since May 4, 2016
- *
- */
-public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
-    private static final long serialVersionUID = 5035497345182141765L;
-    private Random random;
-    private ArrayList<List<Integer>> choices;
-    private AtomicInteger current;
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        random = new Random();
-        choices = new ArrayList<List<Integer>>(targetTasks.size());
-        for (Integer i: targetTasks) {
-            choices.add(Arrays.asList(i));
-        }
-        Collections.shuffle(choices, random);
-        current = new AtomicInteger(0);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        int rightNow;
-        int size = choices.size();
-        while (true) {
-            rightNow = current.incrementAndGet();
-            if (rightNow < size) {
-                return choices.get(rightNow);
-            } else if (rightNow == size) {
-                current.set(0);
-                //This should be thread safe so long as ArrayList does not have any internal state that can be messed up by multi-treaded access.
-                Collections.shuffle(choices, random);
-                return choices.get(0);
-            }
-            //race condition with another thread, and we lost
-            // try again
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
deleted file mode 100644
index db235a7..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitionFactory;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-import org.apache.eagle.alert.engine.router.StreamRouteSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-
-import com.google.common.collect.Lists;
-
-/**
- * After sorting, one stream's message will be routed based on its StreamPartition
- * One stream may have multiple StreamPartitions based on how this stream is grouped by
- *
- * TODO: Add metric statistics
- */
-public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
-    private final OutputCollector outputCollector;
-    private final Object outputLock = new Object();
-    private final List<String> outputStreamIds;
-    private final StreamContext streamContext;
-    private final PartitionedEventSerializer serializer;
-    private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap;
-    private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
-    private final String sourceId;
-
-    public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer){
-        this.sourceId = sourceId;
-        this.outputCollector = outputCollector;
-        this.routeSpecMap = new HashMap<>();
-        this.routePartitionerMap = new HashMap<>();
-        this.outputStreamIds = outputStreamIds;
-        this.streamContext = streamContext;
-        this.serializer = serializer;
-    }
-
-    public void emit(PartitionedEvent event) {
-        try {
-            this.streamContext.counter().scope("send_count").incr();
-            StreamPartition partition = event.getPartition();
-            StreamRouterSpec routerSpec = routeSpecMap.get(partition);
-            if (routerSpec == null) {
-                if (LOG.isDebugEnabled()) {
-                    // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
-                    LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
-                }
-                this.drop(event);
-                return;
-            }
-
-            if (routePartitionerMap.get(routerSpec.getPartition()) == null) {
-                LOG.error("Partitioner for " + routerSpec + " is null");
-                synchronized (outputLock) {
-                    this.streamContext.counter().scope("fail_count").incr();
-                    this.outputCollector.fail(event.getAnchor());
-                }
-                return;
-            }
-
-            StreamEvent newEvent = event.getEvent().copy();
-
-            // Get handler for the partition
-            List<StreamRoutePartitioner> queuePartitioners = routePartitionerMap.get(partition);
-
-            synchronized (outputLock) {
-                for (StreamRoutePartitioner queuePartitioner : queuePartitioners) {
-                    List<StreamRoute> streamRoutes = queuePartitioner.partition(newEvent);
-                    // it is possible that one event can be sent to multiple slots in one slotqueue if that is All grouping
-                    for (StreamRoute streamRoute : streamRoutes) {
-                        String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
-                        try {
-                            PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, routerSpec.getPartition(), streamRoute.getPartitionKey());
-                            // Route Target Stream id instead of component id
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
-                            }
-                            if(this.serializer == null) {
-                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent));
-                            } else {
-                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
-                            }
-                            this.streamContext.counter().scope("emit_count").incr();
-                        } catch (RuntimeException ex) {
-                            this.streamContext.counter().scope("fail_count").incr();
-                            LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
-                            throw ex;
-                        }
-                    }
-                }
-                outputCollector.ack(event.getAnchor());
-            }
-        } catch (Exception ex){
-            LOG.error(ex.getMessage(),ex);
-            synchronized (outputLock) {
-                this.streamContext.counter().scope("fail_count").incr();
-                this.outputCollector.fail(event.getAnchor());
-            }
-        }
-    }
-
-    @Override
-    public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
-                                         Collection<StreamRouterSpec> removed,
-                                         Collection<StreamRouterSpec> modified,
-                                         Map<String, StreamDefinition> sds){
-        Map<StreamPartition,StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
-        Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
-
-        // added StreamRouterSpec i.e. there is a new StreamPartition
-        for(StreamRouterSpec spec : added){
-            if(copyRouteSpecMap.containsKey(spec.getPartition())){
-                LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
-            }else{
-                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
-            }
-        }
-
-        // removed StreamRouterSpec i.e. there is a deleted StreamPartition
-        for(StreamRouterSpec spec : removed){
-            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
-                LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
-            }else{
-                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
-            }
-        }
-
-        // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
-        for(StreamRouterSpec spec : modified){
-            if(!copyRouteSpecMap.containsKey(spec.getPartition())){
-                LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
-            }else{
-                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
-                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
-            }
-        }
-
-        // switch
-        routeSpecMap = copyRouteSpecMap;
-        routePartitionerMap = copyRoutePartitionerMap;
-    }
-
-    private void inplaceRemove(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
-                               Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                               StreamRouterSpec toBeRemoved){
-        routeSpecMap.remove(toBeRemoved.getPartition());
-        routePartitionerMap.remove(toBeRemoved.getPartition());
-    }
-
-    private void inplaceAdd(Map<StreamPartition,StreamRouterSpec> routeSpecMap,
-                            Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                            StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds){
-        routeSpecMap.put(toBeAdded.getPartition(), toBeAdded);
-        try {
-            List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds);
-            routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
-        }catch(Exception e){
-            LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(),e);
-            routeSpecMap.remove(toBeAdded.getPartition());
-            routePartitionerMap.remove(toBeAdded.getPartition());
-        }
-    }
-
-    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception{
-        List<StreamRoutePartitioner> routePartitioners = new ArrayList<>();
-        for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
-            routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
-                    Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
-                    sds.get(streamRouterSpec.getPartition().getStreamId()),
-                    streamRouterSpec.getPartition()));
-        }
-        return routePartitioners;
-    }
-
-    @Override
-    public void drop(PartitionedEvent event) {
-        synchronized (outputLock) {
-            this.streamContext.counter().scope("drop_count").incr();
-            if (event.getAnchor() != null) {
-                this.outputCollector.ack(event.getAnchor());
-            }else{
-                throw new IllegalStateException(event.toString()+" was not acked as anchor is null");
-            }
-        }
-    }
-
-    public void flush() {
-
-    }
-}
\ No newline at end of file


Mime
View raw message