eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [22/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:30 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
deleted file mode 100755
index 144f9aa..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.utils.SingletonExecutor;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-/**
- * Since 5/1/16.
- * This is container for hosting all policies belonging to the same monitoredStream
- * MonitoredStream refers to tuple of {dataSource, streamId, groupby}
- * The container is also called {@link WorkSlot}
- */
-public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener, SerializationMetadataProvider {
-    private static final Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
-    private static final long serialVersionUID = -4132297691448945672L;
-    private PolicyGroupEvaluator policyGroupEvaluator;
-    private AlertBoltOutputCollectorWrapper alertOutputCollector;
-    private String boltId;
-    private boolean logEventEnabled;
-    private volatile Object outputLock;
-    // mapping from policy name to PolicyDefinition
-    private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
-
-    private volatile Set<PublishPartition> cachedPublishPartitions = new HashSet<>();
-
-    private AlertBoltSpec spec;
-
-    public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
-        super(boltId, changeNotifyService, config);
-        this.boltId = boltId;
-        this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1"); // use bolt id as evaluatorId.
-        // TODO next stage evaluator
-
-        if (config.hasPath("topology.logEventEnabled")) {
-            logEventEnabled = config.getBoolean("topology.logEventEnabled");
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        this.streamContext.counter().incr("execute_count");
-        try {
-            PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
-            if (logEventEnabled) {
-                LOG.info("Alert bolt {} received event: {}", boltId, pe.getEvent());
-            }
-            String streamEventVersion = pe.getEvent().getMetaVersion();
-
-            if (streamEventVersion == null) {
-                // if stream event version is null, need to initialize it
-                pe.getEvent().setMetaVersion(specVersion);
-            } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) {
-                if (specVersion != null && streamEventVersion != null
-                    && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
-                    // check if specVersion is older than stream_event_version
-                    // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
-                    // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
-                    long timestampOfSpecVersion = Long.valueOf(specVersion.substring(13));
-                    long timestampOfStreamEventVersion = Long.valueOf(streamEventVersion.substring(13));
-                    specVersionOutofdate = timestampOfSpecVersion < timestampOfStreamEventVersion;
-                    if (!specVersionOutofdate) {
-                        pe.getEvent().setMetaVersion(specVersion);
-                    }
-                }
-
-                String message = String.format("Spec Version [%s] of AlertBolt is %s Stream Event Version [%s]!", specVersion, specVersionOutofdate ? "older than" : "newer than", streamEventVersion);
-                LOG.warn(message);
-
-                // send out metrics for meta conflict
-                this.streamContext.counter().incr("meta_conflict");
-
-                ExecutorService executors = SingletonExecutor.getExecutorService();
-                executors.submit(() -> {
-                    // if spec version is out-of-date, need to refresh it
-                    if (specVersionOutofdate) {
-                        try {
-                            IMetadataServiceClient client = new MetadataServiceClientImpl(this.getConfig());
-                            String topologyId = spec.getTopologyName();
-                            AlertBoltSpec latestSpec = client.getVersionedSpec().getAlertSpecs().get(topologyId);
-                            if (latestSpec != null) {
-                                spec = latestSpec;
-                            }
-                        } catch (Exception e) {
-                            LOG.error(e.toString());
-                        }
-
-                    }
-                });
-
-            }
-
-            policyGroupEvaluator.nextEvent(pe.withAnchor(input));
-            synchronized (outputLock) {
-                this.collector.ack(input);
-            }
-            this.streamContext.counter().incr("ack_count");
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            synchronized (outputLock) {
-                this.streamContext.counter().incr("fail_count");
-                this.collector.fail(input);
-            }
-        } finally {
-            alertOutputCollector.flush();
-        }
-    }
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) {
-        // instantiate output lock object
-        outputLock = new Object();
-        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context);
-        alertOutputCollector = new AlertBoltOutputCollectorWrapper(new StormOutputCollector(collector), outputLock, streamContext);
-        policyGroupEvaluator.init(streamContext, alertOutputCollector);
-        metadataChangeNotifyService.registerListener(this);
-        metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(AlertConstants.FIELD_0, AlertConstants.FIELD_1));
-    }
-
-    @Override
-    public void cleanup() {
-        policyGroupEvaluator.close();
-        alertOutputCollector.flush();
-        alertOutputCollector.close();
-        super.cleanup();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
-        List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
-        if (newPolicies == null) {
-            LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
-            return;
-        }
-
-        Map<String, PolicyDefinition> newPoliciesMap = new HashMap<>();
-        newPolicies.forEach(p -> newPoliciesMap.put(p.getName(), p));
-        MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies);
-        comparator.compare();
-
-        MapComparator<String, StreamDefinition> streamComparator = new MapComparator<>(sds, sdf);
-        streamComparator.compare();
-
-        List<StreamDefinition> addOrUpdatedStreams = streamComparator.getAdded();
-        addOrUpdatedStreams.addAll(streamComparator.getModified());
-        List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values());
-        addOrUpdatedStreams.forEach(s -> {
-            cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId())
-                || p.getOutputStreams().contains(s.getStreamId())).forEach(p -> {
-                    if (comparator.getModified().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0
-                        && comparator.getAdded().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0) {
-                        comparator.getModified().add(p);
-                    }
-                });
-            ;
-        });
-
-        policyGroupEvaluator.onPolicyChange(spec.getVersion(), comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
-
-        // update alert output collector
-        Set<PublishPartition> newPublishPartitions = new HashSet<>();
-        spec.getPublishPartitions().forEach(p -> {
-            if (newPolicies.stream().filter(o -> o.getName().equals(p.getPolicyId())).count() > 0) {
-                newPublishPartitions.add(p);
-            }
-        });
-
-        Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(newPublishPartitions, cachedPublishPartitions);
-        Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions, newPublishPartitions);
-        Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(newPublishPartitions, cachedPublishPartitions);
-
-        LOG.debug("added PublishPartition " + addedPublishPartitions);
-        LOG.debug("removed PublishPartition " + removedPublishPartitions);
-        LOG.debug("modified PublishPartition " + modifiedPublishPartitions);
-
-        alertOutputCollector.onAlertBoltSpecChange(addedPublishPartitions, removedPublishPartitions, modifiedPublishPartitions);
-
-        // switch
-        cachedPolicies = newPoliciesMap;
-        cachedPublishPartitions = newPublishPartitions;
-        sdf = sds;
-        specVersion = spec.getVersion();
-        this.spec = spec;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
deleted file mode 100644
index 44a5fe9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
-import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
-import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
-    private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
-    private final AlertPublisher alertPublisher;
-    private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
-    private volatile Map<String, PolicyDefinition> policyDefinitionMap;
-    private volatile Map<String, StreamDefinition> streamDefinitionMap;
-    private AlertTemplateEngine alertTemplateEngine;
-
-    private boolean logEventEnabled;
-    private TopologyContext context;
-    private AlertStreamFilter alertFilter;
-
-    public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) {
-        super(alertPublisherName, coordinatorService, config);
-        this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
-
-        if (config != null && config.hasPath("topology.logEventEnabled")) {
-            logEventEnabled = config.getBoolean("topology.logEventEnabled");
-        }
-    }
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) {
-        coordinatorService.registerListener(this);
-        coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
-        this.alertPublisher.init(config, stormConf);
-        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context);
-        this.context = context;
-        this.alertTemplateEngine = AlertTemplateProvider.createAlertTemplateEngine();
-        this.alertTemplateEngine.init(config);
-        this.alertFilter = new PipeStreamFilter(new AlertContextEnrichFilter(this), new AlertTemplateFilter(alertTemplateEngine));
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            streamContext.counter().incr("receive_count");
-            PublishPartition partition = (PublishPartition) input.getValueByField(AlertConstants.FIELD_0);
-            AlertStreamEvent event = (AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1);
-            if (logEventEnabled) {
-                LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
-            }
-            AlertStreamEvent filteredEvent = alertFilter.filter(event);
-            if (filteredEvent != null) {
-                alertPublisher.nextEvent(partition, filteredEvent);
-            }
-            this.collector.ack(input);
-            streamContext.counter().incr("ack_count");
-        } catch (Throwable ex) {
-            streamContext.counter().incr("fail_count");
-            LOG.error(ex.getMessage(), ex);
-            collector.reportError(ex);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        alertPublisher.close();
-        super.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields());
-    }
-
-    @Override
-    public synchronized void onAlertPublishSpecChange(PublishSpec pubSpec, Map<String, StreamDefinition> sds) {
-        if (pubSpec == null) {
-            return;
-        }
-        this.streamDefinitionMap = sds;
-
-        List<Publishment> newPublishments = pubSpec.getPublishments();
-        if (newPublishments == null) {
-            LOG.info("no publishments with PublishSpec {} for this topology", pubSpec);
-            return;
-        }
-
-        Map<String, Publishment> newPublishmentsMap = new HashMap<>();
-        newPublishments.forEach(p -> newPublishmentsMap.put(p.getName(), p));
-        MapComparator<String, Publishment> comparator = new MapComparator<>(newPublishmentsMap, cachedPublishments);
-        comparator.compare();
-
-        List<Publishment> beforeModified = new ArrayList<>();
-        comparator.getModified().forEach(p -> beforeModified.add(cachedPublishments.get(p.getName())));
-        alertPublisher.onPublishChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), beforeModified);
-
-        // switch
-        cachedPublishments = newPublishmentsMap;
-        specVersion = pubSpec.getVersion();
-    }
-
-    @Override
-    public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
-        List<String> policyToRemove = new ArrayList<>();
-        if (this.policyDefinitionMap != null) {
-            policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList()));
-        }
-
-        this.policyDefinitionMap = pds;
-        this.streamDefinitionMap = sds;
-
-        for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
-            try {
-                this.alertTemplateEngine.register(entry.getValue());
-            } catch (Throwable throwable) {
-                LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
-            }
-        }
-
-        for (String policyId : policyToRemove) {
-            try {
-                this.alertTemplateEngine.unregister(policyId);
-            } catch (Throwable throwable) {
-                LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
-            }
-        }
-    }
-
-    private class AlertContextEnrichFilter implements AlertStreamFilter {
-        private final AlertPublisherBolt alertPublisherBolt;
-
-        private AlertContextEnrichFilter(AlertPublisherBolt alertPublisherBolt) {
-            this.alertPublisherBolt = alertPublisherBolt;
-        }
-
-        /**
-         * TODO: Refactor wrapAlertPublishEvent into alertTemplateEngine and remove extraData from AlertStreamEvent.
-         */
-        @Override
-        public AlertStreamEvent filter(AlertStreamEvent event) {
-            event.ensureAlertId();
-            Map<String, Object> extraData = new HashMap<>();
-            List<String> appIds = new ArrayList<>();
-            if (alertPublisherBolt.policyDefinitionMap == null || alertPublisherBolt.streamDefinitionMap == null) {
-                LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized");
-            } else {
-                PolicyDefinition policyDefinition = alertPublisherBolt.policyDefinitionMap.get(event.getPolicyId());
-                if (alertPublisherBolt.policyDefinitionMap != null && policyDefinition != null) {
-                    for (String inputStreamId : policyDefinition.getInputStreams()) {
-                        StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId);
-                        if (sd != null) {
-                            extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId());
-                            appIds.add(sd.getStreamSource());
-                        }
-                    }
-                    extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
-                    extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, policyDefinition.getDefinition().getValue());
-                    event.setSeverity(policyDefinition.getAlertSeverity());
-                    event.setCategory(policyDefinition.getAlertCategory());
-                }
-                event.setContext(extraData);
-            }
-            return event;
-        }
-    }
-
-    private class AlertTemplateFilter implements AlertStreamFilter {
-        private final AlertTemplateEngine alertTemplateEngine;
-
-        private AlertTemplateFilter(AlertTemplateEngine alertTemplateEngine) {
-            this.alertTemplateEngine = alertTemplateEngine;
-        }
-
-        @Override
-        public AlertStreamEvent filter(AlertStreamEvent event) {
-            return this.alertTemplateEngine.filter(event);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
deleted file mode 100644
index 5c65d91..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.runner;
-
-import org.apache.commons.collections.CollectionUtils;
-
-import java.util.*;
-
-/**
- * Since 5/2/16.
- */
-public class MapComparator<K, V> {
-    private Map<K, V> map1;
-    private Map<K, V> map2;
-    private List<V> added = new ArrayList<>();
-    private List<V> removed = new ArrayList<>();
-    private List<V> modified = new ArrayList<>();
-
-    public MapComparator(Map<K, V> map1, Map<K, V> map2) {
-        this.map1 = map1;
-        this.map2 = map2;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void compare() {
-        Set<K> keys1 = map1.keySet();
-        Set<K> keys2 = map2.keySet();
-        Collection<K> addedKeys = CollectionUtils.subtract(keys1, keys2);
-        Collection<K> removedKeys = CollectionUtils.subtract(keys2, keys1);
-        Collection<K> modifiedKeys = CollectionUtils.intersection(keys1, keys2);
-
-        addedKeys.forEach(k -> added.add(map1.get(k)));
-        removedKeys.forEach(k -> removed.add(map2.get(k)));
-        modifiedKeys.forEach(k -> {
-            if (!map1.get(k).equals(map2.get(k))) {
-                modified.add(map1.get(k));
-            }
-        });
-    }
-
-    public List<V> getAdded() {
-        return added;
-    }
-
-    public List<V> getRemoved() {
-        return removed;
-    }
-
-    public List<V> getModified() {
-        return modified;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
deleted file mode 100644
index 771a667..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import org.apache.eagle.alert.metric.IMetricSystem;
-import org.apache.eagle.alert.metric.MetricSystem;
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import com.codahale.metrics.Gauge;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Share same metric system.
- */
-public class StormMetricConsumer implements IMetricsConsumer {
-    public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class);
-    private String topologyName;
-    private IMetricSystem metricSystem;
-    private String topologyId;
-
-    @SuppressWarnings( {"serial", "rawtypes"})
-    @Override
-    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
-        Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-        topologyName = config.getString("appId");
-        topologyId = context.getStormId();
-        metricSystem = MetricSystem.load(config);
-        metricSystem.tags(new HashMap<String, Object>() {
-            {
-                put("appId", topologyName);
-                put("stormId", topologyId);
-            }
-        });
-        metricSystem.start();
-    }
-
-    @SuppressWarnings( {"unchecked", "rawtypes"})
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        synchronized (metricSystem) {
-            List<String> metricList = new LinkedList<>();
-            for (DataPoint dataPoint : dataPoints) {
-                if (dataPoint.value instanceof Map) {
-                    Map<String, Object> values = (Map<String, Object>) dataPoint.value;
-                    for (Map.Entry<String, Object> entry : values.entrySet()) {
-                        String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey());
-                        metricList.add(metricName);
-                        Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                        if (gauge == null) {
-                            LOG.info("Register metric {}", metricName);
-                            gauge = new DataPointGauge(entry.getValue());
-                            metricSystem.registry().register(metricName, gauge);
-                        } else {
-                            ((DataPointGauge) gauge).setValue(entry.getValue());
-                        }
-                    }
-                } else {
-                    String metricName = buildMetricName(taskInfo, dataPoint.name);
-                    metricList.add(metricName);
-                    LOG.info("Register metric {}", metricName);
-                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                    if (gauge == null) {
-                        LOG.info("Register metric {}", metricName);
-                        gauge = new DataPointGauge(dataPoint.value);
-                        metricSystem.registry().register(metricName, gauge);
-                    } else {
-                        ((DataPointGauge) gauge).setValue(dataPoint.value);
-                    }
-                }
-            }
-            metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
-            metricSystem.report();
-            metricSystem.registry().getGauges().values().forEach((gauge -> {
-                if (gauge instanceof DataPointGauge) {
-                    ((DataPointGauge) gauge).reset();
-                }
-            }));
-            LOG.info("Reported {} metric data points from {} [{}]", dataPoints.size(), taskInfo.srcComponentId, taskInfo.srcTaskId);
-        }
-    }
-
-    private class DataPointGauge implements Gauge<Object> {
-        private Object value;
-
-        public DataPointGauge(Object initialValue) {
-            this.value = initialValue;
-        }
-
-        @Override
-        public Object getValue() {
-            return value;
-        }
-
-        public void setValue(Object value) {
-            this.value = value;
-        }
-
-        public void reset() {
-            this.value = 0;
-        }
-    }
-
-    private String buildMetricName(TaskInfo taskInfo, String... name) {
-        return String.join(".", StringUtils.join(name, ".").replace("/", "."), taskInfo.srcComponentId, taskInfo.srcTaskId + "");
-    }
-
-    @Override
-    public void cleanup() {
-        metricSystem.stop();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
deleted file mode 100644
index 3c13ff7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import org.apache.eagle.alert.metric.IMetricSystem;
-import org.apache.eagle.alert.metric.MetricSystem;
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import com.codahale.metrics.Gauge;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.metric.MetricConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Per MetricSystem instance per task.
- */
-public class StormMetricTaggedConsumer implements IMetricsConsumer {
-    public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
-    private final Map<String, MetricSystem> metricSystems = new HashMap<>();
-    private Config config;
-    private String metricNamePrefix;
-    private Map<String, Object> baseTags = new HashMap<>();
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
-        this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-
-        if (config.hasPath("appId")) {
-            baseTags.put("appId", config.getString("appId"));
-        }
-
-        if (config.hasPath("siteId")) {
-            baseTags.put("siteId", config.getString("siteId"));
-        }
-
-        baseTags.put("appExecId", context.getStormId());
-
-        if (config.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) {
-            metricNamePrefix = config.getString(MetricConfigs.METRIC_PREFIX_CONF);
-        }
-    }
-
-    @SuppressWarnings("serial")
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        synchronized (metricSystems) {
-            String uniqueTaskKey = buildUniqueTaskKey(taskInfo);
-            MetricSystem metricSystem = metricSystems.get(uniqueTaskKey);
-            if (metricSystem == null) {
-                metricSystem = MetricSystem.load(config);
-                metricSystems.put(uniqueTaskKey, metricSystem);
-                metricSystem.tags(baseTags);
-                metricSystem.tags(new HashMap<String, Object>() {
-                    {
-                        put("componentId", taskInfo.srcComponentId);
-                        put("taskId", taskInfo.srcTaskId);
-                    }
-                });
-                metricSystem.start();
-                LOG.info("Initialized metric reporter for {}", uniqueTaskKey);
-            }
-            report(metricSystem, taskInfo, dataPoints);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey);
-            }
-        }
-    }
-
-    @SuppressWarnings( {"unchecked", "rawtypes"})
-    private void report(MetricSystem metricSystem, TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        List<String> metricList = new LinkedList<>();
-        for (DataPoint dataPoint : dataPoints) {
-            if (dataPoint.value instanceof Map) {
-                Map<String, Object> values = (Map<String, Object>) dataPoint.value;
-                for (Map.Entry<String, Object> entry : values.entrySet()) {
-                    String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name, entry.getKey());
-                    metricList.add(metricName);
-                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                    if (gauge == null) {
-                        gauge = new DataPointGauge(entry.getValue());
-                        metricSystem.registry().register(metricName, gauge);
-                        LOG.debug("Register metric {}", metricName);
-                    } else {
-                        ((DataPointGauge) gauge).setValue(entry.getValue());
-                    }
-                }
-            } else {
-                String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name);
-                metricList.add(metricName);
-                Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                if (gauge == null) {
-                    gauge = new DataPointGauge(dataPoint.value);
-                    metricSystem.registry().register(metricName, gauge);
-                    LOG.debug("Register metric {}", metricName);
-                } else {
-                    ((DataPointGauge) gauge).setValue(dataPoint.value);
-                }
-            }
-        }
-        metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
-        metricSystem.report();
-        metricSystem.registry().getGauges().values().forEach((gauge -> {
-            if (gauge instanceof DataPointGauge) {
-                ((DataPointGauge) gauge).reset();
-            }
-        }));
-    }
-
-    private static class DataPointGauge implements Gauge<Object> {
-        private Object value;
-
-        public DataPointGauge(Object initialValue) {
-            this.setValue(initialValue);
-        }
-
-        @Override
-        public Object getValue() {
-            return value;
-        }
-
-        public void setValue(Object value) {
-            this.value = value;
-        }
-
-        public void reset() {
-            this.value = 0;
-        }
-    }
-
-    private static String buildUniqueTaskKey(TaskInfo taskInfo) {
-        return String.format("%s[%s]", taskInfo.srcComponentId, taskInfo.srcTaskId);
-    }
-
-    private static String buildSimpleMetricName(String prefix, TaskInfo taskInfo, String... name) {
-        String metricName = String.join(".", StringUtils.join(name, ".").replace("/", ".")).replace("__", "");
-        if (prefix == null) {
-            return metricName;
-        } else {
-            return String.format("%s%s", prefix, metricName);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        metricSystems.values().forEach(IMetricSystem::stop);
-        metricSystems.clear();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
deleted file mode 100644
index e37b680..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.router.StreamRouter;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-
-public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
-    private static final long serialVersionUID = -7611470889316430372L;
-    private StreamRouter router;
-    private StreamRouterBoltOutputCollector routeCollector;
-    // mapping from StreamPartition to StreamSortSpec
-    private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
-    // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
-    private volatile Map<StreamPartition, List<StreamRouterSpec>> cachedSRS = new HashMap<>();
-
-    public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
-        super(boltId, changeNotifyService, config);
-        this.router = new StreamRouterImpl(boltId + "-router");
-    }
-
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
-        streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
-        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), new StormOutputCollector(collector, serializer), this.getOutputStreamIds(), streamContext);
-        router.prepare(streamContext, routeCollector);
-        changeNotifyService.registerListener(this);
-        changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            this.streamContext.counter().incr("execute_count");
-            this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
-        } catch (Exception ex) {
-            this.streamContext.counter().incr("fail_count");
-            LOG.error(ex.getMessage(), ex);
-            this.collector.fail(input);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        this.router.close();
-        super.cleanup();
-    }
-
-    /**
-     * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions.
-     *
-     * @param spec
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public synchronized void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds) {
-        sanityCheck(spec);
-
-        // figure out added, removed, modified StreamSortSpec
-        Map<StreamPartition, StreamSortSpec> newSSS = spec.makeSSS();
-
-        Set<StreamPartition> newStreamIds = newSSS.keySet();
-        Set<StreamPartition> cachedStreamIds = cachedSSS.keySet();
-        Collection<StreamPartition> addedStreamIds = CollectionUtils.subtract(newStreamIds, cachedStreamIds);
-        Collection<StreamPartition> removedStreamIds = CollectionUtils.subtract(cachedStreamIds, newStreamIds);
-        Collection<StreamPartition> modifiedStreamIds = CollectionUtils.intersection(newStreamIds, cachedStreamIds);
-
-        Map<StreamPartition, StreamSortSpec> added = new HashMap<>();
-        Map<StreamPartition, StreamSortSpec> removed = new HashMap<>();
-        Map<StreamPartition, StreamSortSpec> modified = new HashMap<>();
-        addedStreamIds.forEach(s -> added.put(s, newSSS.get(s)));
-        removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s)));
-        modifiedStreamIds.forEach(s -> {
-            if (!newSSS.get(s).equals(cachedSSS.get(s))) { // this means StreamSortSpec is changed for one specific streamId
-                modified.put(s, newSSS.get(s));
-            }
-        });
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("added StreamSortSpec " + added);
-            LOG.debug("removed StreamSortSpec " + removed);
-            LOG.debug("modified StreamSortSpec " + modified);
-        }
-        router.onStreamSortSpecChange(added, removed, modified);
-        // switch cache
-        cachedSSS = newSSS;
-
-        // figure out added, removed, modified StreamRouterSpec
-        Map<StreamPartition, List<StreamRouterSpec>> newSRS = spec.makeSRS();
-
-        Set<StreamPartition> newStreamPartitions = newSRS.keySet();
-        Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();
-
-        Collection<StreamPartition> addedStreamPartitions = CollectionUtils.subtract(newStreamPartitions, cachedStreamPartitions);
-        Collection<StreamPartition> removedStreamPartitions = CollectionUtils.subtract(cachedStreamPartitions, newStreamPartitions);
-        Collection<StreamPartition> modifiedStreamPartitions = CollectionUtils.intersection(newStreamPartitions, cachedStreamPartitions);
-
-        Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>();
-        Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>();
-        Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>();
-        addedStreamPartitions.forEach(s -> addedRouterSpecs.addAll(newSRS.get(s)));
-        removedStreamPartitions.forEach(s -> removedRouterSpecs.addAll(cachedSRS.get(s)));
-        modifiedStreamPartitions.forEach(s -> {
-            if (!CollectionUtils.isEqualCollection(newSRS.get(s), cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
-                modifiedRouterSpecs.addAll(newSRS.get(s));
-            }
-        });
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("added StreamRouterSpec " + addedRouterSpecs);
-            LOG.debug("removed StreamRouterSpec " + removedRouterSpecs);
-            LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs);
-        }
-
-        routeCollector.onStreamRouterSpecChange(addedRouterSpecs, removedRouterSpecs, modifiedRouterSpecs, sds);
-        // switch cache
-        cachedSRS = newSRS;
-        sdf = sds;
-        specVersion = spec.getVersion();
-    }
-
-    /**
-     * in correlation cases, multiple streams will go to the same queue for correlation policy.
-     *
-     * @param spec
-     */
-    private void sanityCheck(RouterSpec spec) {
-        Set<String> totalRequestedSlots = new HashSet<>();
-        for (StreamRouterSpec s : spec.getRouterSpecs()) {
-            for (PolicyWorkerQueue q : s.getTargetQueue()) {
-                List<String> workers = new ArrayList<>();
-                q.getWorkers().forEach(w -> workers.add(w.getBoltId()));
-                totalRequestedSlots.addAll(workers);
-            }
-        }
-        if (totalRequestedSlots.size() > getOutputStreamIds().size()) {
-            String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds());
-            LOG.error(error);
-            throw new IllegalStateException(error);
-        }
-    }
-
-    public StreamRouter getStreamRouter() {
-        return router;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
deleted file mode 100755
index 3f06f66..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-
-/**
- * By default
- * 1. one spout with multiple tasks
- * 2. multiple router bolts with each bolt having exactly one task
- * 3. multiple alert bolts with each bolt having exactly one task
- * 4. one publish bolt with multiple tasks
- */
-public class UnitTopologyRunner {
-    private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class);
-    public static final String spoutName = "alertEngineSpout";
-    private static final String streamRouterBoltNamePrefix = "streamRouterBolt";
-    private static final String alertBoltNamePrefix = "alertBolt";
-    public static final String alertPublishBoltName = "alertPublishBolt";
-
-    public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
-    public static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
-    public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts";
-    public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
-    public static final String PUBLISH_EXECUTOR_NUM = "topology.numOfPublishExecutors";
-    public static final String LOCAL_MODE = "topology.localMode";
-    public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
-    public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
-
-    private final IMetadataChangeNotifyService metadataChangeNotifyService;
-    private backtype.storm.Config givenStormConfig = null;
-
-    public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService) {
-        this.metadataChangeNotifyService = metadataChangeNotifyService;
-    }
-
-    public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) {
-        this(changeNotifyService);
-        this.givenStormConfig = stormConfig;
-    }
-
-    // -----------------------------
-    // Storm Topology Submit Helper
-    // -----------------------------
-
-    private void run(String topologyId,
-                     int numOfTotalWorkers,
-                     int numOfSpoutTasks,
-                     int numOfRouterBolts,
-                     int numOfAlertBolts,
-                     int numOfPublishExecutors,
-                     int numOfPublishTasks,
-                     Config config,
-                     boolean localMode) {
-
-        backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
-        // TODO: Configurable metric consumer instance number
-
-        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
-        LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs);
-        stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
-
-        if (config.hasPath("metric")) {
-            stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
-        }
-
-        stormConfig.setNumWorkers(numOfTotalWorkers);
-        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config).createTopology();
-
-        if (localMode) {
-            LOG.info("Submitting as local mode");
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology(topologyId, stormConfig, topology);
-            Utils.sleep(Long.MAX_VALUE);
-        } else {
-            LOG.info("Submitting as cluster mode");
-            try {
-                StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
-            } catch (Exception ex) {
-                LOG.error("fail submitting topology {}", topology, ex);
-                throw new IllegalStateException(ex);
-            }
-        }
-    }
-
-    public void run(String topologyId, Config config) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
-        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
-        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
-        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
-        boolean localMode = config.getBoolean(LOCAL_MODE);
-        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
-        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode);
-    }
-
-    public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
-        return metadataChangeNotifyService;
-    }
-
-    // ---------------------------
-    // Build Storm Topology
-    // ---------------------------
-
-    public TopologyBuilder buildTopology(String topologyId,
-                                       int numOfSpoutTasks,
-                                       int numOfRouterBolts,
-                                       int numOfAlertBolts,
-                                       int numOfPublishExecutors,
-                                       int numOfPublishTasks,
-                                       Config config) {
-        StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
-        AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        // construct Spout object
-        CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix);
-        builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
-
-        // construct StreamRouterBolt objects
-        for (int i = 0; i < numOfRouterBolts; i++) {
-            routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService());
-        }
-
-        // construct AlertBolt objects
-        for (int i = 0; i < numOfAlertBolts; i++) {
-            alertBolts[i] = new AlertBolt(alertBoltNamePrefix + i, config, getMetadataChangeNotifyService());
-        }
-
-        // construct AlertPublishBolt object
-        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
-
-        // connect spout and router bolt, also define output streams for downstreaming alert bolt
-        for (int i = 0; i < numOfRouterBolts; i++) {
-            String boltName = streamRouterBoltNamePrefix + i;
-
-            // define output streams, which are based on
-            String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName);
-            List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts);
-            for (int j = 0; j < numOfAlertBolts; j++) {
-                String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix + j);
-                outputStreamIds.add(sid);
-            }
-            routerBolts[i].declareOutputStreams(outputStreamIds);
-
-            /**
-             * TODO potentially one route bolt may have multiple tasks, so that is field grouping by groupby fields
-             * that means we need a separate field to become groupby field
-             */
-            builder.setBolt(boltName, routerBolts[i]).fieldsGrouping(spoutName, streamId, new Fields()).setNumTasks(1);
-        }
-
-        // connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt
-        for (int i = 0; i < numOfAlertBolts; i++) {
-            String boltName = alertBoltNamePrefix + i;
-            BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]).setNumTasks(1);
-            for (int j = 0; j < numOfRouterBolts; j++) {
-                String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix + j, boltName);
-                boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix + j, streamId, new Fields());
-            }
-        }
-
-        // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
-        BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks);
-        for (int i = 0; i < numOfAlertBolts; i++) {
-            boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0));
-        }
-
-        return builder;
-    }
-
-    public TopologyBuilder buildTopology(String topologyId, Config config) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
-        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
-        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
-        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
-
-        return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
-    }
-
-    // ---------------------------
-    // Build Topology Metadata
-    // ---------------------------
-
-    public static Topology buildTopologyMetadata(String topologyId, Config config) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
-        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
-        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
-        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
-
-        return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
-    }
-
-    public static Topology buildTopologyMetadata(String topologyId,
-                                                 int numOfSpoutTasks,
-                                                 int numOfRouterBolts,
-                                                 int numOfAlertBolts,
-                                                 int numOfPublishExecutors,
-                                                 int numOfPublishTasks,
-                                                 Config config) {
-        Topology topology = new Topology();
-        topology.setName(topologyId);
-        topology.setNumOfSpout(numOfSpoutTasks);
-        topology.setNumOfAlertBolt(numOfAlertBolts);
-        topology.setNumOfGroupBolt(numOfRouterBolts);
-        topology.setNumOfPublishBolt(numOfPublishTasks);
-
-        // Set Spout ID
-        topology.setSpoutId(spoutName);
-
-        // Set Router (Group) ID
-        Set<String> streamRouterBoltNames = new TreeSet<>();
-        for (int i = 0; i < numOfRouterBolts; i++) {
-            streamRouterBoltNames.add(streamRouterBoltNamePrefix + i);
-        }
-        topology.setGroupNodeIds(streamRouterBoltNames);
-
-        // Set Alert Bolt ID
-        Set<String> alertBoltIds = new TreeSet<>();
-        for (int i = 0; i < numOfAlertBolts; i++) {
-            alertBoltIds.add(alertBoltNamePrefix + i);
-        }
-        topology.setAlertBoltIds(alertBoltIds);
-
-        // Set Publisher ID
-        topology.setPubBoltId(alertPublishBoltName);
-
-        // TODO: Load bolts' parallelism from configuration, currently keep 1 by default.
-
-        topology.setSpoutParallelism(1);
-        topology.setGroupParallelism(1);
-        topology.setAlertParallelism(1);
-
-        return topology;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
deleted file mode 100644
index db461d8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Expects flat Json scheme.
- */
-public class JsonScheme implements Scheme {
-    private static final long serialVersionUID = -8352896475656975577L;
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonScheme.class);
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    private String topic;
-
-    @SuppressWarnings("rawtypes")
-    public JsonScheme(String topic, Map conf) {
-        this.topic = topic;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields("f1");
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public List<Object> deserialize(byte[] ser) {
-        try {
-            if (ser != null) {
-                Map map = mapper.readValue(ser, Map.class);
-                return Arrays.asList(topic, map);
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Content is null, ignore");
-                }
-            }
-        } catch (IOException e) {
-            try {
-                LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
-            } catch (Exception ex) {
-                LOG.error(ex.getMessage(), ex);
-            }
-        }
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
deleted file mode 100644
index 4e02edb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import org.apache.eagle.alert.coordination.model.StreamNameSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A strategy to get stream name from message tuple.
- * @since 5/5/16.
- */
-public class JsonStringStreamNameSelector implements StreamNameSelector {
-    private static final  Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
-    public static final  String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
-    public static final  String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
-    public static final  String STREAM_NAME_FORMAT = "streamNameFormat";
-
-    private String userProvidedStreamName;
-    private String[] fieldNamesToInferStreamName;
-    private String streamNameFormat;
-
-    public JsonStringStreamNameSelector(Properties prop) {
-        userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
-        String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY);
-        if (fields != null) {
-            fieldNamesToInferStreamName = fields.split(",");
-        }
-        streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT);
-        if (streamNameFormat == null) {
-            LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration.");
-        }
-    }
-
-    @Override
-    public String getStreamName(Map<String, Object> tuple) {
-        if (userProvidedStreamName != null) {
-            return userProvidedStreamName;
-        } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) {
-            Object[] args = new Object[fieldNamesToInferStreamName.length];
-            for (int i = 0; i < fieldNamesToInferStreamName.length; i++) {
-                Object colValue = tuple.get(fieldNamesToInferStreamName[i]);
-                args[i] = colValue;
-            }
-            return String.format(streamNameFormat, args);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!");
-        }
-        return "defaultStringStream";
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
deleted file mode 100644
index 57c8897..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * used for parsing plain string.
- */
-public class PlainStringScheme implements Scheme {
-    private static final long serialVersionUID = 5969724968671646310L;
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
-    private String topic;
-
-    @SuppressWarnings("rawtypes")
-    public PlainStringScheme(String topic, Map conf) {
-        this.topic = topic;
-    }
-
-    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
-    public static final String STRING_SCHEME_KEY = "str";
-
-    public static String deserializeString(byte[] buff) {
-        return new String(buff, UTF8_CHARSET);
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY);
-    }
-
-    @SuppressWarnings( {"unchecked", "rawtypes"})
-    @Override
-    public List<Object> deserialize(byte[] ser) {
-        Map m = new HashMap<>();
-        m.put("value", deserializeString(ser));
-        m.put("timestamp", System.currentTimeMillis());
-        return new Values(topic, m);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
deleted file mode 100644
index 0b88483..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import org.apache.eagle.alert.coordination.model.StreamNameSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Since 5/3/16.
- */
-public class PlainStringStreamNameSelector implements StreamNameSelector {
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
-    private static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
-    private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
-
-    private String streamName;
-
-    public PlainStringStreamNameSelector(Properties prop) {
-        streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
-        if (streamName == null) {
-            streamName = DEFAULT_STRING_STREAM_NAME;
-        }
-    }
-
-    @Override
-    public String getStreamName(Map<String, Object> tuple) {
-        return streamName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
deleted file mode 100644
index 1e8f440..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * TODO: Seams the complexity dosen't bring enough performance improve.
- *
- * @see PartitionedEvent
- */
-@Deprecated
-public class PartitionedEventDigestSerializer implements Serializer<PartitionedEvent> {
-    private final Serializer<StreamEvent> streamEventSerializer;
-    private final Serializer<StreamPartition> streamPartitionSerializer;
-
-    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider) {
-        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
-        this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
-    }
-
-    @Override
-    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(entity.getPartitionKey());
-        streamEventSerializer.serialize(entity.getEvent(), dataOutput);
-        streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
-    }
-
-    @Override
-    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
-        PartitionedEvent event = new PartitionedEvent();
-        event.setPartitionKey(dataInput.readLong());
-        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
-        event.setEvent(streamEvent);
-        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
-        partition.setStreamId(streamEvent.getStreamId());
-        event.setPartition(partition);
-        return event;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
deleted file mode 100644
index 428ad34..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.io.IOException;
-
-public interface PartitionedEventSerializer {
-
-    byte[] serialize(PartitionedEvent entity) throws IOException;
-
-    PartitionedEvent deserialize(byte[] bytes) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
deleted file mode 100644
index ef190b4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-
-/**
- * Integration interface to provide stream definition for serializer.
- */
-public interface SerializationMetadataProvider {
-    /**
-     * @param streamId
-     * @return StreamDefinition or null if not exist.
-     */
-    StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException;
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
deleted file mode 100644
index 599152e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public interface Serializer<V> {
-    void serialize(V value, DataOutput dataOutput) throws IOException;
-
-    V deserialize(DataInput dataInput) throws IOException;
-}
\ No newline at end of file


Mime
View raw message