eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [14/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:07:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
deleted file mode 100644
index 5008dbf..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamRouter;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamRouterImpl implements StreamRouter {
-    private static final long serialVersionUID = -4640125063690900014L;
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
-    private final String name;
-    private volatile Map<StreamPartition,StreamSortHandler> streamSortHandlers;
-    private PartitionedEventCollector outputCollector;
-    private StreamTimeClockManager streamTimeClockManager;
-    private StreamContext context;
-
-    /**
-     * @param name This name should be formed by topologyId + router id, which is built by topology builder
-     */
-    public StreamRouterImpl(String name){
-        this.name = name;
-    }
-
-    public String getName(){
-        return this.name;
-    }
-
-    @Override
-    public void close() {
-        streamSortHandlers.values().forEach(StreamSortHandler::close);
-        streamTimeClockManager.close();
-    }
-
-    public void prepare(StreamContext context, PartitionedEventCollector outputCollector) {
-        this.streamTimeClockManager = new StreamTimeClockManagerImpl();
-        this.streamSortHandlers = new HashMap<>();
-        this.outputCollector = outputCollector;
-        this.context = context;
-    }
-
-    /**
-     * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer
-     *
-     * @param event StreamEvent
-     */
-    public void nextEvent(PartitionedEvent event) {
-        this.context.counter().scope("receive_count").incr();
-        if(!dispatchToSortHandler(event)) {
-            this.context.counter().scope("direct_count").incr();
-            // Pass through directly if no need to sort
-            outputCollector.emit(event);
-        }
-        this.context.counter().scope("sort_count").incr();
-        // Update stream clock time if moving forward and trigger all tick listeners
-        streamTimeClockManager.onTimeUpdate(event.getStreamId(),event.getTimestamp());
-    }
-
-    /**
-     * @param event input event
-     * @return whether sorted
-     */
-    private boolean dispatchToSortHandler(PartitionedEvent event){
-        if(event.getTimestamp() <= 0) return false;
-
-        StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition());
-        if(sortHandler == null){
-            if(event.isSortRequired()) {
-                LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
-                this.context.counter().scope("miss_sort_count").incr();
-            }
-            return false;
-        } else {
-            sortHandler.nextEvent(event);
-            return true;
-        }
-    }
-
-    @Override
-    public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
-            Map<StreamPartition, StreamSortSpec> removed,
-            Map<StreamPartition, StreamSortSpec> changed) {
-        synchronized (streamTimeClockManager) {
-            Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers);
-            // add new StreamSortSpec
-            if (added != null && added.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : added.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec);
-                    } else {
-                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
-                        handler.prepare(tmp.getStreamId(),spec.getValue(), this.outputCollector);
-                        copy.put(tmp, handler);
-                        streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler);
-                    }
-                }
-            }
-
-            // remove StreamSortSpec
-            if (removed != null && removed.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : removed.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        copy.get(tmp).close();
-                        streamTimeClockManager.removeListener(copy.get(tmp));
-                        copy.remove(tmp);
-                    } else {
-                        LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue());
-                    }
-                }
-            }
-
-            // modify StreamSortSpec
-            if (changed != null && changed.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : changed.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        copy.get(tmp).close();
-                        streamTimeClockManager.removeListener(copy.get(tmp));
-                        copy.remove(tmp);
-                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
-                        handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
-                        copy.put(tmp, handler);
-                        streamTimeClockManager.registerListener(tmp.getStreamId(), handler);
-                    } else {
-                        LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue());
-                    }
-                }
-            }
-
-            // atomic switch
-            this.streamSortHandlers = copy;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
deleted file mode 100644
index cc819ba..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-@SuppressWarnings({"rawtypes", "serial"})
-public abstract class AbstractStreamBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
-    private IMetadataChangeNotifyService changeNotifyService;
-    private Config config;
-    private List<String> outputStreamIds;
-    protected OutputCollector collector;
-
-    public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
-        this.changeNotifyService = changeNotifyService;
-        this.config = config;
-    }
-
-    public void declareOutputStreams(List<String> outputStreamIds){
-        this.outputStreamIds = outputStreamIds;
-    }
-
-    protected List<String> getOutputStreamIds(){
-        return this.outputStreamIds;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
-        this.collector = collector;
-        internalPrepare(collector,this.changeNotifyService,this.config,context);
-    }
-
-    /**
-     * subclass should implement more initialization for example
-     * 1) register metadata change
-     * @param collector
-     * @param metadataManager
-     * @param config
-     * @param context
-     */
-    public abstract void internalPrepare(
-            OutputCollector collector,
-            IMetadataChangeNotifyService metadataManager,
-            Config config, TopologyContext context);
-
-    @Override
-    public void cleanup() {
-        super.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if(this.outputStreamIds!=null){
-            LOG.info("declare streams: {} ", outputStreamIds);
-            for(String streamId:this.outputStreamIds){
-                declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
-            }
-        } else {
-            declarer.declare(new Fields(AlertConstants.FIELD_0));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
deleted file mode 100755
index 30ff5f0..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
-
-/**
- * Since 5/1/16.
- * This is container for hosting all policies belonging to the same monitoredStream
- * MonitoredStream refers to tuple of {dataSource, streamId, grouopby}
- * The container is also called {@link WorkSlot}
- */
-public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener,SerializationMetadataProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
-    private static final long serialVersionUID = -4132297691448945672L;
-    private PolicyGroupEvaluator policyGroupEvaluator;
-    private AlertStreamCollector alertOutputCollector;
-    private String boltId;
-    private volatile Object outputLock;
-    // mapping from policy name to PolicyDefinition
-    private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
-
-    private StreamContext streamContext;
-    private volatile Map<String, StreamDefinition> sdf;
-    private PartitionedEventSerializer serializer;
-
-    public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){
-        super(changeNotifyService, config);
-        this.boltId = boltId;
-        this.policyGroupEvaluator = policyGroupEvaluator;
-    }
-
-    PartitionedEvent deserialize(Object object) throws IOException {
-        // byte[] in higher priority
-        if(object instanceof byte[]) {
-            return serializer.deserialize((byte[]) object);
-        } else {
-            return (PartitionedEvent) object;
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        this.streamContext.counter().scope("execute_count").incr();
-        try {
-            policyGroupEvaluator.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
-            synchronized (outputLock) {
-                this.collector.ack(input);
-            }
-            this.streamContext.counter().scope("ack_count").incr();
-        }catch (Exception ex) {
-            LOG.error(ex.getMessage(),ex);
-            synchronized (outputLock) {
-                this.streamContext.counter().scope("fail_count").incr();
-                this.collector.fail(input);
-            }
-        } finally {
-            alertOutputCollector.flush();
-        }
-    }
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) {
-        // instantiate output lock object
-        outputLock = new Object();
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context);
-        serializer = Serializers.newPartitionedEventSerializer(this);
-        alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext);
-        policyGroupEvaluator.init(streamContext, alertOutputCollector);
-        metadataChangeNotifyService.registerListener(this);
-        metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(AlertConstants.FIELD_0, AlertConstants.FIELD_1));
-    }
-
-    @Override
-    public void cleanup() {
-        policyGroupEvaluator.close();
-        alertOutputCollector.flush();
-        alertOutputCollector.close();
-        super.cleanup();
-    }
-
-    @Override
-    public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
-        List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
-        if(newPolicies == null) {
-            LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
-            return;
-        }
-
-        Map<String, PolicyDefinition> newPoliciesMap = new HashMap<>();
-        newPolicies.forEach(p -> newPoliciesMap.put(p.getName(), p));
-        MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies);
-        comparator.compare();
-
-        policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
-
-        // switch
-        cachedPolicies = newPoliciesMap;
-        sdf = sds;
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return sdf.get(streamId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
deleted file mode 100644
index 0a239e2..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
-
-@SuppressWarnings("serial")
-public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
-    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
-    private final AlertPublisher alertPublisher;
-    private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
-    private StreamContextImpl streamContext;
-
-    public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
-        super(coordinatorService, config);
-        this.alertPublisher = alertPublisher;
-        this.alertPublisher.init(config);
-    }
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) {
-        coordinatorService.registerListener(this);
-        coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            streamContext.counter().scope("receive_count");
-            alertPublisher.nextEvent((AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1));
-            this.collector.ack(input);
-            streamContext.counter().scope("ack_count");
-        } catch (Exception ex){
-            streamContext.counter().scope("fail_count");
-            LOG.error(ex.getMessage(),ex);
-            collector.reportError(ex);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        alertPublisher.close();
-        super.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields());
-    }
-
-    @Override
-    public void onAlertPublishSpecChange(PublishSpec pubSpec, Map<String, StreamDefinition> sds) {
-        if(pubSpec == null) return;
-
-        List<Publishment> newPublishments = pubSpec.getPublishments();
-        if(newPublishments == null) {
-            LOG.info("no publishments with PublishSpec {} for this topology", pubSpec);
-            return;
-        }
-
-        Map<String, Publishment> newPublishmentsMap = new HashMap<>();
-        newPublishments.forEach(p -> newPublishmentsMap.put(p.getName(), p));
-        MapComparator<String, Publishment> comparator = new MapComparator<>(newPublishmentsMap, cachedPublishments);
-        comparator.compare();
-
-        List<Publishment> beforeModified = new ArrayList<>();
-        comparator.getModified().forEach(p -> beforeModified.add(cachedPublishments.get(p.getName())));
-        alertPublisher.onPublishChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), beforeModified);
-
-        // switch
-        cachedPublishments = newPublishmentsMap;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
deleted file mode 100644
index 04595b1..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-
-/**
- * Since 5/2/16.
- */
-public class MapComparator<K, V> {
-    private Map<K, V> map1;
-    private Map<K, V> map2;
-    private List<V> added = new ArrayList<>();
-    private List<V> removed = new ArrayList<>();
-    private List<V> modified = new ArrayList<>();
-    public MapComparator(Map<K, V> map1, Map<K, V> map2){
-        this.map1 = map1;
-        this.map2 = map2;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void compare(){
-        Set<K> keys1 = map1.keySet();
-        Set<K> keys2 = map2.keySet();
-        Collection<K> addedKeys = CollectionUtils.subtract(keys1, keys2);
-        Collection<K> removedKeys = CollectionUtils.subtract(keys2, keys1);
-        Collection<K> modifiedKeys = CollectionUtils.intersection(keys1, keys2);
-
-        addedKeys.forEach(k -> added.add(map1.get(k)));
-        removedKeys.forEach(k -> removed.add(map2.get(k)));
-        modifiedKeys.forEach(k -> {
-            if(!map1.get(k).equals(map2.get(k))){
-                modified.add(map1.get(k));
-            }
-        });
-    }
-
-    public List<V> getAdded(){
-        return added;
-    }
-
-    public List<V> getRemoved(){
-        return removed;
-    }
-
-    public List<V> getModified(){
-        return modified;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
deleted file mode 100644
index 1a6267b..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.metric.IMetricSystem;
-import org.apache.eagle.alert.metric.MetricSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-
-import com.codahale.metrics.Gauge;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-
-/**
- * Share same metric system
- */
-public class StormMetricConsumer implements IMetricsConsumer {
-    public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class);
-    private String topologyName;
-    private IMetricSystem metricSystem;
-    private String topologyId;
-
-    @SuppressWarnings({ "serial", "rawtypes" })
-    @Override
-    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
-        Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-        topologyName = config.getString("topology.name");
-        topologyId = context.getStormId();
-        metricSystem = MetricSystem.load(config);
-        metricSystem.tags(new HashMap<String,Object>(){{
-            put("topologyName",topologyName);
-            put("topologyId",topologyId);
-        }});
-        metricSystem.start();
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        synchronized (metricSystem) {
-            List<String> metricList = new LinkedList<>();
-            for(DataPoint dataPoint:dataPoints){
-                if(dataPoint.value instanceof Map) {
-                    Map<String,Object> values = (Map<String, Object>) dataPoint.value;
-                    for(Map.Entry<String,Object> entry:values.entrySet()){
-                        String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey());
-                        metricList.add(metricName);
-                        Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                        if(gauge == null) {
-                            LOG.info("Register metric {}", metricName);
-                            gauge = new DataPointGauge(entry.getValue());
-                            metricSystem.registry().register(metricName,gauge);
-                        }else{
-                            ((DataPointGauge) gauge).setValue(entry.getValue());
-                        }
-                    }
-                } else {
-                    String metricName = buildMetricName(taskInfo, dataPoint.name);
-                    metricList.add(metricName);
-                    LOG.info("Register metric {}",metricName);
-                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                    if(gauge == null) {
-                        LOG.info("Register metric {}", metricName);
-                        gauge = new DataPointGauge(dataPoint.value);
-                        metricSystem.registry().register(metricName,gauge);
-                    } else {
-                        ((DataPointGauge) gauge).setValue(dataPoint.value);
-                    }
-                }
-            }
-            metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
-            metricSystem.report();
-            metricSystem.registry().getGauges().values().forEach((gauge -> {
-                if(gauge instanceof DataPointGauge){
-                    ((DataPointGauge)gauge).reset();
-                }
-            }));
-            LOG.info("Reported {} metric data points from {} [{}]",dataPoints.size(),taskInfo.srcComponentId,taskInfo.srcTaskId);
-        }
-    }
-
-    private class DataPointGauge implements Gauge<Object> {
-        private Object value;
-        public DataPointGauge(Object initialValue){
-            this.value = initialValue;
-        }
-
-        @Override
-        public Object getValue() {
-            return value;
-        }
-
-        public void setValue(Object value){
-            this.value = value;
-        }
-
-        public void reset(){
-            this.value = 0;
-        }
-    }
-
-    private String buildMetricName(TaskInfo taskInfo,String ... name ){
-        return String.join(".",StringUtils.join(name,".").replace("/","."),taskInfo.srcComponentId,taskInfo.srcTaskId+"");
-    }
-
-    @Override
-    public void cleanup() {
-        metricSystem.stop();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
deleted file mode 100644
index c18a44f..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.metric.IMetricSystem;
-import org.apache.eagle.alert.metric.MetricSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-
-import com.codahale.metrics.Gauge;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-
-/**
- * Per MetricSystem instance per task
- */
-public class StormMetricTaggedConsumer implements IMetricsConsumer {
-    public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
-    private String topologyName;
-    private Map<String,MetricSystem> metricSystems;
-    private String stormId;
-    private Config config;
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
-        this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-        topologyName = config.getString("topology.name");
-        stormId = context.getStormId();
-        metricSystems = new HashMap<>();
-    }
-
-    @SuppressWarnings("serial")
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        synchronized (metricSystems) {
-            String uniqueTaskKey = buildUniqueTaskKey(taskInfo);
-            MetricSystem metricSystem = metricSystems.get(uniqueTaskKey);
-            if(metricSystem == null){
-                metricSystem = MetricSystem.load(config);
-                metricSystems.put(uniqueTaskKey,metricSystem);
-                metricSystem.tags(new HashMap<String,Object>(){{
-                    put("topology",topologyName);
-                    put("stormId",stormId);
-                    put("component",taskInfo.srcComponentId);
-                    put("task",taskInfo.srcTaskId);
-                }});
-                metricSystem.start();
-                LOG.info("Initialized metric reporter for {}",uniqueTaskKey);
-            }
-            report(metricSystem,taskInfo,dataPoints);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey);
-            }
-        }
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void report(MetricSystem metricSystem,TaskInfo taskInfo,Collection<DataPoint> dataPoints){
-        List<String> metricList = new LinkedList<>();
-        for (DataPoint dataPoint : dataPoints) {
-            if (dataPoint.value instanceof Map) {
-                Map<String, Object> values = (Map<String, Object>) dataPoint.value;
-                for (Map.Entry<String, Object> entry : values.entrySet()) {
-                    String metricName = buildSimpleMetricName(taskInfo, dataPoint.name, entry.getKey());
-                    metricList.add(metricName);
-                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                    if (gauge == null) {
-                        gauge = new DataPointGauge(entry.getValue());
-                        metricSystem.registry().register(metricName, gauge);
-                        LOG.debug("Register metric {}", metricName);
-                    } else {
-                        ((DataPointGauge) gauge).setValue(entry.getValue());
-                    }
-                }
-            } else {
-                String metricName = buildSimpleMetricName(taskInfo, dataPoint.name);
-                metricList.add(metricName);
-                Gauge gauge = metricSystem.registry().getGauges().get(metricName);
-                if (gauge == null) {
-                    gauge = new DataPointGauge(dataPoint.value);
-                    metricSystem.registry().register(metricName, gauge);
-                    LOG.debug("Register metric {}", metricName);
-                } else {
-                    ((DataPointGauge) gauge).setValue(dataPoint.value);
-                }
-            }
-        }
-        metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
-        metricSystem.report();
-        metricSystem.registry().getGauges().values().forEach((gauge -> {
-            if(gauge instanceof DataPointGauge){
-                ((DataPointGauge)gauge).reset();
-            }
-        }));
-    }
-
-    private static class DataPointGauge implements Gauge<Object> {
-        private Object value;
-        public DataPointGauge(Object initialValue){
-            this.setValue(initialValue);
-        }
-
-        @Override
-        public Object getValue() {
-            return value;
-        }
-
-        public void setValue(Object value){
-            this.value = value;
-        }
-
-        public void reset(){
-            this.value = 0;
-        }
-    }
-
-    private static String buildUniqueTaskKey(TaskInfo taskInfo){
-        return String.format("%s[%s]",taskInfo.srcComponentId,taskInfo.srcTaskId);
-    }
-
-    private static String buildSimpleMetricName(TaskInfo taskInfo,String ... name ){
-        return String.join(".",StringUtils.join(name,".").replace("/","."));
-    }
-
-    @Override
-    public void cleanup() {
-        metricSystems.values().forEach(IMetricSystem::stop);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
deleted file mode 100644
index 942ef97..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamRouter;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
-    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
-    private static final long serialVersionUID = -7611470889316430372L;
-    private StreamRouter router;
-    private StreamRouterBoltOutputCollector routeCollector;
-    // mapping from StreamPartition to StreamSortSpec
-    private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
-    // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
-    private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
-    private volatile Map<String,StreamDefinition> sdf = new HashMap<>();
-    private PartitionedEventSerializer serializer;
-
-    public StreamRouterBolt(StreamRouter router, Config config, IMetadataChangeNotifyService changeNotifyService) {
-        super(changeNotifyService, config);
-        this.router = router;
-    }
-
-    private StreamContext streamContext;
-
-    @Override
-    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
-        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context);
-        serializer= Serializers.newPartitionedEventSerializer(this);
-        routeCollector = new StreamRouterBoltOutputCollector(this.router.getName(),collector,this.getOutputStreamIds(),streamContext,serializer);
-        router.prepare(streamContext, routeCollector);
-        changeNotifyService.registerListener(this);
-        changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
-    }
-
-    PartitionedEvent deserialize(Object object) throws IOException {
-        // byte[] in higher priority
-        if(object instanceof byte[]) {
-            return serializer.deserialize((byte[]) object);
-        } else {
-            return (PartitionedEvent) object;
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        try {
-            this.streamContext.counter().scope("execute_count").incr();
-            this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
-        } catch (Exception ex) {
-            this.streamContext.counter().scope("fail_count").incr();
-            LOG.error(ex.getMessage(),ex);
-            this.collector.fail(input);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        this.router.close();
-        super.cleanup();
-    }
-
-    /**
-     * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions.
-     * @param spec
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds) {
-        sanityCheck(spec);
-
-        // figure out added, removed, modified StreamSortSpec
-        Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t ->  {
-            if (t.getPartition().getSortSpec() != null) {
-                newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
-            }
-        });
-
-        Set<StreamPartition> newStreamIds = newSSS.keySet();
-        Set<StreamPartition> cachedStreamIds = cachedSSS.keySet();
-        Collection<StreamPartition> addedStreamIds = CollectionUtils.subtract(newStreamIds, cachedStreamIds);
-        Collection<StreamPartition> removedStreamIds = CollectionUtils.subtract(cachedStreamIds, newStreamIds);
-        Collection<StreamPartition> modifiedStreamIds = CollectionUtils.intersection(newStreamIds, cachedStreamIds);
-
-        Map<StreamPartition, StreamSortSpec> added = new HashMap<>();
-        Map<StreamPartition, StreamSortSpec> removed = new HashMap<>();
-        Map<StreamPartition, StreamSortSpec> modified = new HashMap<>();
-        addedStreamIds.forEach(s -> added.put(s, newSSS.get(s)));
-        removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s)));
-        modifiedStreamIds.forEach(s -> {
-            if(!newSSS.get(s).equals(cachedSSS.get(s))){ // this means StreamSortSpec is changed for one specific streamId
-                modified.put(s, newSSS.get(s));
-            }
-        });
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("added StreamSortSpec " + added);
-            LOG.debug("removed StreamSortSpec " + removed);
-            LOG.debug("modified StreamSortSpec " + modified);
-        }
-        router.onStreamSortSpecChange(added, removed, modified);
-        // switch cache
-        cachedSSS = newSSS;
-
-        // figure out added, removed, modified StreamRouterSpec
-        Map<StreamPartition, StreamRouterSpec> newSRS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t -> newSRS.put(t.getPartition(), t));
-
-        Set<StreamPartition> newStreamPartitions = newSRS.keySet();
-        Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();
-
-        Collection<StreamPartition> addedStreamPartitions = CollectionUtils.subtract(newStreamPartitions, cachedStreamPartitions);
-        Collection<StreamPartition> removedStreamPartitions = CollectionUtils.subtract(cachedStreamPartitions, newStreamPartitions);
-        Collection<StreamPartition> modifiedStreamPartitions = CollectionUtils.intersection(newStreamPartitions, cachedStreamPartitions);
-
-        Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>();
-        Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>();
-        Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>();
-        addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s)));
-        removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s)));
-        modifiedStreamPartitions.forEach(s -> {
-            if(!newSRS.get(s).equals(cachedSRS.get(s))){ // this means StreamRouterSpec is changed for one specific StreamPartition
-                modifiedRouterSpecs.add(newSRS.get(s));
-            }
-        });
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("added StreamRouterSpec " + addedRouterSpecs);
-            LOG.debug("removed StreamRouterSpec " + removedRouterSpecs);
-            LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs);
-        }
-
-        routeCollector.onStreamRouterSpecChange(addedRouterSpecs, removedRouterSpecs, modifiedRouterSpecs, sds);
-        // switch cache
-        cachedSRS = newSRS;
-        sdf = sds;
-    }
-
-    /**
-     * in correlation cases, multiple streams will go to the same queue for correlation policy
-     * @param spec
-     */
-    private void sanityCheck(RouterSpec spec){
-        Set<String> totalRequestedSlots = new HashSet<>();
-        for(StreamRouterSpec s : spec.getRouterSpecs()){
-            for(PolicyWorkerQueue q : s.getTargetQueue()){
-                List<String> workers = new ArrayList<>();
-                q.getWorkers().forEach(w -> workers.add(w.getBoltId()));
-                totalRequestedSlots.addAll(workers);
-            }
-        }
-        if(totalRequestedSlots.size() > getOutputStreamIds().size()){
-            String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds());
-            LOG.error(error);
-            throw new IllegalStateException(error);
-        }
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return this.sdf.get(streamId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
deleted file mode 100755
index cef94c7..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.runner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-
-/**
- * By default
- * 1. one spout with multiple tasks
- * 2. multiple router bolts with each bolt having exactly one task
- * 3. multiple alert bolts with each bolt having exactly one task
- * 4. one publish bolt with multiple tasks
- */
-public class UnitTopologyRunner {
-    private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class);
-    public final static String spoutName = "alertEngineSpout";
-    private final static String streamRouterBoltNamePrefix = "streamRouterBolt";
-    private final static String alertBoltNamePrefix = "alertBolt";
-    public final static String alertPublishBoltName = "alertPublishBolt";
-
-    public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
-    public final static String ALERT_TASK_NUM = "topology.numOfAlertBolts";
-    public final static String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
-    public final static String LOCAL_MODE = "topology.localMode";
-    public final static String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
-    public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
-
-    private final IMetadataChangeNotifyService metadataChangeNotifyService;
-
-    public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
-        this.metadataChangeNotifyService = metadataChangeNotifyService;
-    }
-
-    public StormTopology buildTopology(String topologyId,
-                              int numOfSpoutTasks,
-                              int numOfRouterBolts,
-                              int numOfAlertBolts,
-                              int numOfPublishTasks,
-                              Config config) {
-
-        StreamRouterImpl[] routers = new StreamRouterImpl[numOfRouterBolts];
-        StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
-        PolicyGroupEvaluatorImpl[] evaluators = new PolicyGroupEvaluatorImpl[numOfAlertBolts];
-        AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
-        AlertPublisherImpl publisher;
-        AlertPublisherBolt publisherBolt;
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-
-        // construct Spout object
-        CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix);
-        builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
-
-        // construct StreamRouterBolt objects
-        for(int i=0; i<numOfRouterBolts; i++){
-            routers[i] = new StreamRouterImpl(streamRouterBoltNamePrefix + i);
-            routerBolts[i] = new StreamRouterBolt(routers[i], config, getMetadataChangeNotifyService());
-        }
-
-        // construct AlertBolt objects
-        for(int i=0; i<numOfAlertBolts; i++){
-            evaluators[i] = new PolicyGroupEvaluatorImpl(alertBoltNamePrefix + i);
-            alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, evaluators[i], config, getMetadataChangeNotifyService());
-        }
-
-        // construct AlertPublishBolt object
-        publisher = new AlertPublisherImpl(alertPublishBoltName);
-        publisherBolt = new AlertPublisherBolt(publisher, config, getMetadataChangeNotifyService());
-
-        // connect spout and router bolt, also define output streams for downstreaming alert bolt
-        for(int i=0; i<numOfRouterBolts; i++){
-            String boltName = streamRouterBoltNamePrefix + i;
-
-            // define output streams, which are based on
-            String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName);
-            List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts);
-            for(int j=0; j<numOfAlertBolts; j++){
-                String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix+j);
-                outputStreamIds.add(sid);
-            }
-            routerBolts[i].declareOutputStreams(outputStreamIds);
-
-            /**
-             * TODO potentially one route bolt may have multiple tasks, so that is field grouping by groupby fields
-             * that means we need a separate field to become groupby field
-             */
-            builder.setBolt(boltName, routerBolts[i]).fieldsGrouping(spoutName, streamId, new Fields());
-        }
-
-        // connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt
-        for(int i=0; i<numOfAlertBolts; i++){
-            String boltName = alertBoltNamePrefix + i;
-            BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]);
-            for(int j=0; j<numOfRouterBolts; j++) {
-                String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix+j, boltName);
-                boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix+j, streamId, new Fields());
-            }
-        }
-
-        // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
-        BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks);
-        for(int i=0; i<numOfAlertBolts; i++) {
-            boltDeclarer.fieldsGrouping(alertBoltNamePrefix+i, new Fields(AlertConstants.FIELD_0));
-        }
-
-        return builder.createTopology();
-    }
-
-    public void run(String topologyId,
-                    int numOfTotalWorkers,
-                    int numOfSpoutTasks,
-                    int numOfRouterBolts,
-                    int numOfAlertBolts,
-                    int numOfPublishTasks,
-                    Config config,
-                    boolean localMode) {
-        backtype.storm.Config stormConfig = new backtype.storm.Config();
-        // TODO: Configurable metric consumer instance number
-
-        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
-        LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
-        stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
-
-        if(config.hasPath("metric")) {
-            stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()),1);
-        }
-
-        stormConfig.setNumWorkers(numOfTotalWorkers);
-        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
-
-        if(localMode) {
-            LOG.info("Submitting as local mode");
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology(topologyId, stormConfig, topology);
-            Utils.sleep(Long.MAX_VALUE);
-        }else{
-            LOG.info("Submitting as cluster mode");
-            try {
-                StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
-            } catch(Exception ex) {
-                LOG.error("fail submitting topology {}", topology, ex);
-                throw new IllegalStateException(ex);
-            }
-        }
-    }
-
-    public void run(Config config) {
-        String topologyId = config.getString("topology.name");
-        run(topologyId,config);
-    }
-
-    public void run(String topologyId,Config config) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
-        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
-        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
-        boolean localMode = config.getBoolean(LOCAL_MODE);
-        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
-        run(topologyId,numOfTotalWorkers, numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config, localMode);
-    }
-
-    public StormTopology buildTopology(String topologyId,Config config) {
-        int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
-        int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
-        int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
-        int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
-        return buildTopology(topologyId,numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config);
-    }
-
-    public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
-        return metadataChangeNotifyService;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
deleted file mode 100644
index a3487d3..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-
-/**
- * Expects flat Json scheme
- */
-public class JsonScheme implements Scheme {
-    private static final long serialVersionUID = -8352896475656975577L;
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonScheme.class);
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    private String topic;
-
-    public JsonScheme(String topic) {
-        this.topic = topic;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields("f1");
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public List<Object> deserialize(byte[] ser) {
-        try {
-            if(ser != null ) {
-                Map map = mapper.readValue(ser, Map.class);
-                return Arrays.asList(topic, map);
-            }else{
-                if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore");
-            }
-        } catch (IOException e) {
-            try {
-                LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
-            }catch(Exception ex){
-                LOG.error(ex.getMessage(), ex);
-            }
-        }
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
deleted file mode 100644
index 1182e3f..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.eagle.alert.coordination.model.StreamNameSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A strategy to get stream name from message tuple.
- * 
- * Since 5/5/16.
- */
-public class JsonStringStreamNameSelector implements StreamNameSelector {
-    private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
-    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
-    private final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
-    private final static String STREAM_NAME_FORMAT = "streamNameFormat";
-
-    private String userProvidedStreamName;
-    private String[] fieldNamesToInferStreamName;
-    private String streamNameFormat;
-
-    public JsonStringStreamNameSelector(Properties prop) {
-        userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
-        String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY);
-        if (fields != null) {
-            fieldNamesToInferStreamName = fields.split(",");
-        }
-        streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT);
-        if (streamNameFormat == null) {
-            LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration.");
-        }
-    }
-
-    @Override
-    public String getStreamName(Map<String, Object> tuple) {
-        if (userProvidedStreamName != null) {
-            return userProvidedStreamName;
-        } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) {
-            Object[] args = new Object[fieldNamesToInferStreamName.length];
-            for (int i = 0; i < fieldNamesToInferStreamName.length; i++) {
-                Object colValue = tuple.get(fieldNamesToInferStreamName[i]);
-                args[i] = colValue;
-            }
-            return String.format(streamNameFormat, args);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!");
-        }
-        return "defaultStringStream";
-    }
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
deleted file mode 100644
index 89d2e76..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-/**
- * used for parsing plain string
- */
-public class PlainStringScheme implements Scheme {
-    private static final long serialVersionUID = 5969724968671646310L;
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
-    private String topic;
-
-    public PlainStringScheme(String topic){
-        this.topic = topic;
-    }
-
-    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
-    public static final String STRING_SCHEME_KEY = "str";
-
-    public static String deserializeString(byte[] buff) {
-        return new String(buff, UTF8_CHARSET);
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY);
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    public List<Object> deserialize(byte[] ser) {
-        Map m = new HashMap<>();
-        m.put("value", deserializeString(ser));
-        m.put("timestamp", System.currentTimeMillis());
-        return new Values(topic, m);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
deleted file mode 100644
index 61ec943..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.scheme;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.eagle.alert.coordination.model.StreamNameSelector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Since 5/3/16.
- */
-public class PlainStringStreamNameSelector implements StreamNameSelector {
-    @SuppressWarnings("unused")
-    private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
-    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
-    private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
-
-    private String streamName;
-
-    public PlainStringStreamNameSelector(Properties prop){
-        streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
-        if(streamName == null)
-            streamName = DEFAULT_STRING_STREAM_NAME;
-    }
-    @Override
-    public String getStreamName(Map<String, Object> tuple) {
-        return streamName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
deleted file mode 100644
index 5ba1080..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * TODO: Seams the complexity dosen't bring enough performance improve
- *
- * @see PartitionedEvent
- */
-@Deprecated
-public class PartitionedEventDigestSerializer implements Serializer<PartitionedEvent> {
-    private final Serializer<StreamEvent> streamEventSerializer;
-    private final Serializer<StreamPartition> streamPartitionSerializer;
-
-    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){
-        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
-        this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
-    }
-
-    @Override
-    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(entity.getPartitionKey());
-        streamEventSerializer.serialize(entity.getEvent(),dataOutput);
-        streamPartitionSerializer.serialize(entity.getPartition(),dataOutput);
-    }
-
-    @Override
-    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
-        PartitionedEvent event = new PartitionedEvent();
-        event.setPartitionKey(dataInput.readLong());
-        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
-        event.setEvent(streamEvent);
-        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
-        partition.setStreamId(streamEvent.getStreamId());
-        event.setPartition(partition);
-        return event;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
deleted file mode 100644
index c518e40..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.io.IOException;
-
-public interface PartitionedEventSerializer {
-    /**
-     *
-     * @param entity
-     * @return
-     * @throws IOException
-     */
-    byte[] serialize(PartitionedEvent entity) throws IOException;
-
-    /**
-     *
-     * @param bytes
-     * @return
-     * @throws IOException
-     */
-    PartitionedEvent deserialize(byte[] bytes) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
deleted file mode 100644
index 71b274d..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Integration interface to provide stream definition for serializer
- */
-public interface SerializationMetadataProvider {
-    /**
-     * @param streamId
-     * @return StreamDefinition or null if not exist
-     */
-    StreamDefinition getStreamDefinition(String streamId);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
deleted file mode 100644
index c2f87d0..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public interface Serializer<V> {
-    void serialize(V value,DataOutput dataOutput) throws IOException;
-    V deserialize(DataInput dataInput) throws IOException;
-}
\ No newline at end of file


Mime
View raw message