eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [10/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
new file mode 100644
index 0000000..5008dbf
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.StreamRouter;
+import org.apache.eagle.alert.engine.router.StreamSortHandler;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
+import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamRouterImpl implements StreamRouter {
+    private static final long serialVersionUID = -4640125063690900014L;
+    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
+    private final String name;
+    private volatile Map<StreamPartition,StreamSortHandler> streamSortHandlers;
+    private PartitionedEventCollector outputCollector;
+    private StreamTimeClockManager streamTimeClockManager;
+    private StreamContext context;
+
+    /**
+     * @param name This name should be formed by topologyId + router id, which is built by topology builder
+     */
+    public StreamRouterImpl(String name){
+        this.name = name;
+    }
+
+    public String getName(){
+        return this.name;
+    }
+
+    @Override
+    public void close() {
+        streamSortHandlers.values().forEach(StreamSortHandler::close);
+        streamTimeClockManager.close();
+    }
+
+    public void prepare(StreamContext context, PartitionedEventCollector outputCollector) {
+        this.streamTimeClockManager = new StreamTimeClockManagerImpl();
+        this.streamSortHandlers = new HashMap<>();
+        this.outputCollector = outputCollector;
+        this.context = context;
+    }
+
+    /**
+     * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer
+     *
+     * @param event StreamEvent
+     */
+    public void nextEvent(PartitionedEvent event) {
+        this.context.counter().scope("receive_count").incr();
+        if(!dispatchToSortHandler(event)) {
+            this.context.counter().scope("direct_count").incr();
+            // Pass through directly if no need to sort
+            outputCollector.emit(event);
+        }
+        this.context.counter().scope("sort_count").incr();
+        // Update stream clock time if moving forward and trigger all tick listeners
+        streamTimeClockManager.onTimeUpdate(event.getStreamId(),event.getTimestamp());
+    }
+
+    /**
+     * @param event input event
+     * @return whether sorted
+     */
+    private boolean dispatchToSortHandler(PartitionedEvent event){
+        if(event.getTimestamp() <= 0) return false;
+
+        StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition());
+        if(sortHandler == null){
+            if(event.isSortRequired()) {
+                LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
+                this.context.counter().scope("miss_sort_count").incr();
+            }
+            return false;
+        } else {
+            sortHandler.nextEvent(event);
+            return true;
+        }
+    }
+
+    @Override
+    public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
+            Map<StreamPartition, StreamSortSpec> removed,
+            Map<StreamPartition, StreamSortSpec> changed) {
+        synchronized (streamTimeClockManager) {
+            Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers);
+            // add new StreamSortSpec
+            if (added != null && added.size() > 0) {
+                for (Entry<StreamPartition, StreamSortSpec> spec : added.entrySet()) {
+                    StreamPartition tmp = spec.getKey();
+                    if (copy.containsKey(tmp)) {
+                        LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec);
+                    } else {
+                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
+                        handler.prepare(tmp.getStreamId(),spec.getValue(), this.outputCollector);
+                        copy.put(tmp, handler);
+                        streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler);
+                    }
+                }
+            }
+
+            // remove StreamSortSpec
+            if (removed != null && removed.size() > 0) {
+                for (Entry<StreamPartition, StreamSortSpec> spec : removed.entrySet()) {
+                    StreamPartition tmp = spec.getKey();
+                    if (copy.containsKey(tmp)) {
+                        copy.get(tmp).close();
+                        streamTimeClockManager.removeListener(copy.get(tmp));
+                        copy.remove(tmp);
+                    } else {
+                        LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue());
+                    }
+                }
+            }
+
+            // modify StreamSortSpec
+            if (changed != null && changed.size() > 0) {
+                for (Entry<StreamPartition, StreamSortSpec> spec : changed.entrySet()) {
+                    StreamPartition tmp = spec.getKey();
+                    if (copy.containsKey(tmp)) {
+                        copy.get(tmp).close();
+                        streamTimeClockManager.removeListener(copy.get(tmp));
+                        copy.remove(tmp);
+                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
+                        handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
+                        copy.put(tmp, handler);
+                        streamTimeClockManager.registerListener(tmp.getStreamId(), handler);
+                    } else {
+                        LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue());
+                    }
+                }
+            }
+
+            // atomic switch
+            this.streamSortHandlers = copy;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
new file mode 100644
index 0000000..cc819ba
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+@SuppressWarnings({"rawtypes", "serial"})
+public abstract class AbstractStreamBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
+    private IMetadataChangeNotifyService changeNotifyService;
+    private Config config;
+    private List<String> outputStreamIds;
+    protected OutputCollector collector;
+
+    public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
+        this.changeNotifyService = changeNotifyService;
+        this.config = config;
+    }
+
+    public void declareOutputStreams(List<String> outputStreamIds){
+        this.outputStreamIds = outputStreamIds;
+    }
+
+    protected List<String> getOutputStreamIds(){
+        return this.outputStreamIds;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
+        this.collector = collector;
+        internalPrepare(collector,this.changeNotifyService,this.config,context);
+    }
+
+    /**
+     * subclass should implement more initialization for example
+     * 1) register metadata change
+     * @param collector
+     * @param metadataManager
+     * @param config
+     * @param context
+     */
+    public abstract void internalPrepare(
+            OutputCollector collector,
+            IMetadataChangeNotifyService metadataManager,
+            Config config, TopologyContext context);
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        if(this.outputStreamIds!=null){
+            LOG.info("declare streams: {} ", outputStreamIds);
+            for(String streamId:this.outputStreamIds){
+                declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
+            }
+        } else {
+            declarer.declare(new Fields(AlertConstants.FIELD_0));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
new file mode 100755
index 0000000..30ff5f0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.AlertStreamCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
+import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializers;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+import com.typesafe.config.Config;
+
+/**
+ * Since 5/1/16.
+ * This is container for hosting all policies belonging to the same monitoredStream
+ * MonitoredStream refers to tuple of {dataSource, streamId, grouopby}
+ * The container is also called {@link WorkSlot}
+ */
+public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener,SerializationMetadataProvider {
+    private final static Logger LOG = LoggerFactory.getLogger(AlertBolt.class);
+    private static final long serialVersionUID = -4132297691448945672L;
+    private PolicyGroupEvaluator policyGroupEvaluator;
+    private AlertStreamCollector alertOutputCollector;
+    private String boltId;
+    private volatile Object outputLock;
+    // mapping from policy name to PolicyDefinition
+    private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
+
+    private StreamContext streamContext;
+    private volatile Map<String, StreamDefinition> sdf;
+    private PartitionedEventSerializer serializer;
+
+    public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){
+        super(changeNotifyService, config);
+        this.boltId = boltId;
+        this.policyGroupEvaluator = policyGroupEvaluator;
+    }
+
+    PartitionedEvent deserialize(Object object) throws IOException {
+        // byte[] in higher priority
+        if(object instanceof byte[]) {
+            return serializer.deserialize((byte[]) object);
+        } else {
+            return (PartitionedEvent) object;
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        this.streamContext.counter().scope("execute_count").incr();
+        try {
+            policyGroupEvaluator.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
+            synchronized (outputLock) {
+                this.collector.ack(input);
+            }
+            this.streamContext.counter().scope("ack_count").incr();
+        }catch (Exception ex) {
+            LOG.error(ex.getMessage(),ex);
+            synchronized (outputLock) {
+                this.streamContext.counter().scope("fail_count").incr();
+                this.collector.fail(input);
+            }
+        } finally {
+            alertOutputCollector.flush();
+        }
+    }
+
+    @Override
+    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) {
+        // instantiate output lock object
+        outputLock = new Object();
+        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context);
+        serializer = Serializers.newPartitionedEventSerializer(this);
+        alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext);
+        policyGroupEvaluator.init(streamContext, alertOutputCollector);
+        metadataChangeNotifyService.registerListener(this);
+        metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(AlertConstants.FIELD_0, AlertConstants.FIELD_1));
+    }
+
+    @Override
+    public void cleanup() {
+        policyGroupEvaluator.close();
+        alertOutputCollector.flush();
+        alertOutputCollector.close();
+        super.cleanup();
+    }
+
+    @Override
+    public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
+        List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
+        if(newPolicies == null) {
+            LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
+            return;
+        }
+
+        Map<String, PolicyDefinition> newPoliciesMap = new HashMap<>();
+        newPolicies.forEach(p -> newPoliciesMap.put(p.getName(), p));
+        MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies);
+        comparator.compare();
+
+        policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds);
+
+        // switch
+        cachedPolicies = newPoliciesMap;
+        sdf = sds;
+    }
+
+    @Override
+    public StreamDefinition getStreamDefinition(String streamId) {
+        return sdf.get(streamId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
new file mode 100644
index 0000000..0a239e2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.engine.StreamContextImpl;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
+import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+import com.typesafe.config.Config;
+
+@SuppressWarnings("serial")
+public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
+    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
+    private final AlertPublisher alertPublisher;
+    private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
+    private StreamContextImpl streamContext;
+
+    public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
+        super(coordinatorService, config);
+        this.alertPublisher = alertPublisher;
+        this.alertPublisher.init(config);
+    }
+
+    @Override
+    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) {
+        coordinatorService.registerListener(this);
+        coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
+        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            streamContext.counter().scope("receive_count");
+            alertPublisher.nextEvent((AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1));
+            this.collector.ack(input);
+            streamContext.counter().scope("ack_count");
+        } catch (Exception ex){
+            streamContext.counter().scope("fail_count");
+            LOG.error(ex.getMessage(),ex);
+            collector.reportError(ex);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        alertPublisher.close();
+        super.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields());
+    }
+
+    @Override
+    public void onAlertPublishSpecChange(PublishSpec pubSpec, Map<String, StreamDefinition> sds) {
+        if(pubSpec == null) return;
+
+        List<Publishment> newPublishments = pubSpec.getPublishments();
+        if(newPublishments == null) {
+            LOG.info("no publishments with PublishSpec {} for this topology", pubSpec);
+            return;
+        }
+
+        Map<String, Publishment> newPublishmentsMap = new HashMap<>();
+        newPublishments.forEach(p -> newPublishmentsMap.put(p.getName(), p));
+        MapComparator<String, Publishment> comparator = new MapComparator<>(newPublishmentsMap, cachedPublishments);
+        comparator.compare();
+
+        List<Publishment> beforeModified = new ArrayList<>();
+        comparator.getModified().forEach(p -> beforeModified.add(cachedPublishments.get(p.getName())));
+        alertPublisher.onPublishChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), beforeModified);
+
+        // switch
+        cachedPublishments = newPublishmentsMap;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
new file mode 100644
index 0000000..04595b1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+
+/**
+ * Since 5/2/16.
+ */
+public class MapComparator<K, V> {
+    private Map<K, V> map1;
+    private Map<K, V> map2;
+    private List<V> added = new ArrayList<>();
+    private List<V> removed = new ArrayList<>();
+    private List<V> modified = new ArrayList<>();
+    public MapComparator(Map<K, V> map1, Map<K, V> map2){
+        this.map1 = map1;
+        this.map2 = map2;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void compare(){
+        Set<K> keys1 = map1.keySet();
+        Set<K> keys2 = map2.keySet();
+        Collection<K> addedKeys = CollectionUtils.subtract(keys1, keys2);
+        Collection<K> removedKeys = CollectionUtils.subtract(keys2, keys1);
+        Collection<K> modifiedKeys = CollectionUtils.intersection(keys1, keys2);
+
+        addedKeys.forEach(k -> added.add(map1.get(k)));
+        removedKeys.forEach(k -> removed.add(map2.get(k)));
+        modifiedKeys.forEach(k -> {
+            if(!map1.get(k).equals(map2.get(k))){
+                modified.add(map1.get(k));
+            }
+        });
+    }
+
+    public List<V> getAdded(){
+        return added;
+    }
+
+    public List<V> getRemoved(){
+        return removed;
+    }
+
+    public List<V> getModified(){
+        return modified;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
new file mode 100644
index 0000000..1a6267b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.metric.IMetricSystem;
+import org.apache.eagle.alert.metric.MetricSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+
+import com.codahale.metrics.Gauge;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+/**
+ * Share same metric system
+ */
+public class StormMetricConsumer implements IMetricsConsumer {
+    public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class);
+    private String topologyName;
+    private IMetricSystem metricSystem;
+    private String topologyId;
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Override
+    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
+        Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
+        topologyName = config.getString("topology.name");
+        topologyId = context.getStormId();
+        metricSystem = MetricSystem.load(config);
+        metricSystem.tags(new HashMap<String,Object>(){{
+            put("topologyName",topologyName);
+            put("topologyId",topologyId);
+        }});
+        metricSystem.start();
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+        synchronized (metricSystem) {
+            List<String> metricList = new LinkedList<>();
+            for(DataPoint dataPoint:dataPoints){
+                if(dataPoint.value instanceof Map) {
+                    Map<String,Object> values = (Map<String, Object>) dataPoint.value;
+                    for(Map.Entry<String,Object> entry:values.entrySet()){
+                        String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey());
+                        metricList.add(metricName);
+                        Gauge gauge = metricSystem.registry().getGauges().get(metricName);
+                        if(gauge == null) {
+                            LOG.info("Register metric {}", metricName);
+                            gauge = new DataPointGauge(entry.getValue());
+                            metricSystem.registry().register(metricName,gauge);
+                        }else{
+                            ((DataPointGauge) gauge).setValue(entry.getValue());
+                        }
+                    }
+                } else {
+                    String metricName = buildMetricName(taskInfo, dataPoint.name);
+                    metricList.add(metricName);
+                    LOG.info("Register metric {}",metricName);
+                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
+                    if(gauge == null) {
+                        LOG.info("Register metric {}", metricName);
+                        gauge = new DataPointGauge(dataPoint.value);
+                        metricSystem.registry().register(metricName,gauge);
+                    } else {
+                        ((DataPointGauge) gauge).setValue(dataPoint.value);
+                    }
+                }
+            }
+            metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
+            metricSystem.report();
+            metricSystem.registry().getGauges().values().forEach((gauge -> {
+                if(gauge instanceof DataPointGauge){
+                    ((DataPointGauge)gauge).reset();
+                }
+            }));
+            LOG.info("Reported {} metric data points from {} [{}]",dataPoints.size(),taskInfo.srcComponentId,taskInfo.srcTaskId);
+        }
+    }
+
+    private class DataPointGauge implements Gauge<Object> {
+        private Object value;
+        public DataPointGauge(Object initialValue){
+            this.value = initialValue;
+        }
+
+        @Override
+        public Object getValue() {
+            return value;
+        }
+
+        public void setValue(Object value){
+            this.value = value;
+        }
+
+        public void reset(){
+            this.value = 0;
+        }
+    }
+
+    private String buildMetricName(TaskInfo taskInfo,String ... name ){
+        return String.join(".",StringUtils.join(name,".").replace("/","."),taskInfo.srcComponentId,taskInfo.srcTaskId+"");
+    }
+
+    @Override
+    public void cleanup() {
+        metricSystem.stop();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
new file mode 100644
index 0000000..c18a44f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.metric.IMetricSystem;
+import org.apache.eagle.alert.metric.MetricSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+
+import com.codahale.metrics.Gauge;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+/**
+ * Per MetricSystem instance per task
+ */
+public class StormMetricTaggedConsumer implements IMetricsConsumer {
+    public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
+    private String topologyName;
+    private Map<String,MetricSystem> metricSystems;
+    private String stormId;
+    private Config config;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
+        this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
+        topologyName = config.getString("topology.name");
+        stormId = context.getStormId();
+        metricSystems = new HashMap<>();
+    }
+
+    @SuppressWarnings("serial")
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+        synchronized (metricSystems) {
+            String uniqueTaskKey = buildUniqueTaskKey(taskInfo);
+            MetricSystem metricSystem = metricSystems.get(uniqueTaskKey);
+            if(metricSystem == null){
+                metricSystem = MetricSystem.load(config);
+                metricSystems.put(uniqueTaskKey,metricSystem);
+                metricSystem.tags(new HashMap<String,Object>(){{
+                    put("topology",topologyName);
+                    put("stormId",stormId);
+                    put("component",taskInfo.srcComponentId);
+                    put("task",taskInfo.srcTaskId);
+                }});
+                metricSystem.start();
+                LOG.info("Initialized metric reporter for {}",uniqueTaskKey);
+            }
+            report(metricSystem,taskInfo,dataPoints);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey);
+            }
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void report(MetricSystem metricSystem,TaskInfo taskInfo,Collection<DataPoint> dataPoints){
+        List<String> metricList = new LinkedList<>();
+        for (DataPoint dataPoint : dataPoints) {
+            if (dataPoint.value instanceof Map) {
+                Map<String, Object> values = (Map<String, Object>) dataPoint.value;
+                for (Map.Entry<String, Object> entry : values.entrySet()) {
+                    String metricName = buildSimpleMetricName(taskInfo, dataPoint.name, entry.getKey());
+                    metricList.add(metricName);
+                    Gauge gauge = metricSystem.registry().getGauges().get(metricName);
+                    if (gauge == null) {
+                        gauge = new DataPointGauge(entry.getValue());
+                        metricSystem.registry().register(metricName, gauge);
+                        LOG.debug("Register metric {}", metricName);
+                    } else {
+                        ((DataPointGauge) gauge).setValue(entry.getValue());
+                    }
+                }
+            } else {
+                String metricName = buildSimpleMetricName(taskInfo, dataPoint.name);
+                metricList.add(metricName);
+                Gauge gauge = metricSystem.registry().getGauges().get(metricName);
+                if (gauge == null) {
+                    gauge = new DataPointGauge(dataPoint.value);
+                    metricSystem.registry().register(metricName, gauge);
+                    LOG.debug("Register metric {}", metricName);
+                } else {
+                    ((DataPointGauge) gauge).setValue(dataPoint.value);
+                }
+            }
+        }
+        metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0);
+        metricSystem.report();
+        metricSystem.registry().getGauges().values().forEach((gauge -> {
+            if(gauge instanceof DataPointGauge){
+                ((DataPointGauge)gauge).reset();
+            }
+        }));
+    }
+
+    private static class DataPointGauge implements Gauge<Object> {
+        private Object value;
+        public DataPointGauge(Object initialValue){
+            this.setValue(initialValue);
+        }
+
+        @Override
+        public Object getValue() {
+            return value;
+        }
+
+        public void setValue(Object value){
+            this.value = value;
+        }
+
+        public void reset(){
+            this.value = 0;
+        }
+    }
+
+    private static String buildUniqueTaskKey(TaskInfo taskInfo){
+        return String.format("%s[%s]",taskInfo.srcComponentId,taskInfo.srcTaskId);
+    }
+
+    private static String buildSimpleMetricName(TaskInfo taskInfo,String ... name ){
+        return String.join(".",StringUtils.join(name,".").replace("/","."));
+    }
+
+    @Override
+    public void cleanup() {
+        metricSystems.values().forEach(IMetricSystem::stop);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
new file mode 100644
index 0000000..942ef97
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.StreamRouter;
+import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializers;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
+    private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
+    private static final long serialVersionUID = -7611470889316430372L;
+    private StreamRouter router;
+    private StreamRouterBoltOutputCollector routeCollector;
+    // mapping from StreamPartition to StreamSortSpec
+    private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
+    // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
+    private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
+    private volatile Map<String,StreamDefinition> sdf = new HashMap<>();
+    private PartitionedEventSerializer serializer;
+
+    public StreamRouterBolt(StreamRouter router, Config config, IMetadataChangeNotifyService changeNotifyService) {
+        super(changeNotifyService, config);
+        this.router = router;
+    }
+
+    private StreamContext streamContext;
+
+    @Override
+    public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
+        streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context);
+        serializer= Serializers.newPartitionedEventSerializer(this);
+        routeCollector = new StreamRouterBoltOutputCollector(this.router.getName(),collector,this.getOutputStreamIds(),streamContext,serializer);
+        router.prepare(streamContext, routeCollector);
+        changeNotifyService.registerListener(this);
+        changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
+    }
+
+    PartitionedEvent deserialize(Object object) throws IOException {
+        // byte[] in higher priority
+        if(object instanceof byte[]) {
+            return serializer.deserialize((byte[]) object);
+        } else {
+            return (PartitionedEvent) object;
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            this.streamContext.counter().scope("execute_count").incr();
+            this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
+        } catch (Exception ex) {
+            this.streamContext.counter().scope("fail_count").incr();
+            LOG.error(ex.getMessage(),ex);
+            this.collector.fail(input);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        this.router.close();
+        super.cleanup();
+    }
+
+    /**
+     * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions.
+     * @param spec
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds) {
+        sanityCheck(spec);
+
+        // figure out added, removed, modified StreamSortSpec
+        Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
+        spec.getRouterSpecs().forEach(t ->  {
+            if (t.getPartition().getSortSpec() != null) {
+                newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
+            }
+        });
+
+        Set<StreamPartition> newStreamIds = newSSS.keySet();
+        Set<StreamPartition> cachedStreamIds = cachedSSS.keySet();
+        Collection<StreamPartition> addedStreamIds = CollectionUtils.subtract(newStreamIds, cachedStreamIds);
+        Collection<StreamPartition> removedStreamIds = CollectionUtils.subtract(cachedStreamIds, newStreamIds);
+        Collection<StreamPartition> modifiedStreamIds = CollectionUtils.intersection(newStreamIds, cachedStreamIds);
+
+        Map<StreamPartition, StreamSortSpec> added = new HashMap<>();
+        Map<StreamPartition, StreamSortSpec> removed = new HashMap<>();
+        Map<StreamPartition, StreamSortSpec> modified = new HashMap<>();
+        addedStreamIds.forEach(s -> added.put(s, newSSS.get(s)));
+        removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s)));
+        modifiedStreamIds.forEach(s -> {
+            if(!newSSS.get(s).equals(cachedSSS.get(s))){ // this means StreamSortSpec is changed for one specific streamId
+                modified.put(s, newSSS.get(s));
+            }
+        });
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("added StreamSortSpec " + added);
+            LOG.debug("removed StreamSortSpec " + removed);
+            LOG.debug("modified StreamSortSpec " + modified);
+        }
+        router.onStreamSortSpecChange(added, removed, modified);
+        // switch cache
+        cachedSSS = newSSS;
+
+        // figure out added, removed, modified StreamRouterSpec
+        Map<StreamPartition, StreamRouterSpec> newSRS = new HashMap<>();
+        spec.getRouterSpecs().forEach(t -> newSRS.put(t.getPartition(), t));
+
+        Set<StreamPartition> newStreamPartitions = newSRS.keySet();
+        Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();
+
+        Collection<StreamPartition> addedStreamPartitions = CollectionUtils.subtract(newStreamPartitions, cachedStreamPartitions);
+        Collection<StreamPartition> removedStreamPartitions = CollectionUtils.subtract(cachedStreamPartitions, newStreamPartitions);
+        Collection<StreamPartition> modifiedStreamPartitions = CollectionUtils.intersection(newStreamPartitions, cachedStreamPartitions);
+
+        Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>();
+        Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>();
+        Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>();
+        addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s)));
+        removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s)));
+        modifiedStreamPartitions.forEach(s -> {
+            if(!newSRS.get(s).equals(cachedSRS.get(s))){ // this means StreamRouterSpec is changed for one specific StreamPartition
+                modifiedRouterSpecs.add(newSRS.get(s));
+            }
+        });
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("added StreamRouterSpec " + addedRouterSpecs);
+            LOG.debug("removed StreamRouterSpec " + removedRouterSpecs);
+            LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs);
+        }
+
+        routeCollector.onStreamRouterSpecChange(addedRouterSpecs, removedRouterSpecs, modifiedRouterSpecs, sds);
+        // switch cache
+        cachedSRS = newSRS;
+        sdf = sds;
+    }
+
+    /**
+     * in correlation cases, multiple streams will go to the same queue for correlation policy
+     * @param spec
+     */
+    private void sanityCheck(RouterSpec spec){
+        Set<String> totalRequestedSlots = new HashSet<>();
+        for(StreamRouterSpec s : spec.getRouterSpecs()){
+            for(PolicyWorkerQueue q : s.getTargetQueue()){
+                List<String> workers = new ArrayList<>();
+                q.getWorkers().forEach(w -> workers.add(w.getBoltId()));
+                totalRequestedSlots.addAll(workers);
+            }
+        }
+        if(totalRequestedSlots.size() > getOutputStreamIds().size()){
+            String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds());
+            LOG.error(error);
+            throw new IllegalStateException(error);
+        }
+    }
+
+    @Override
+    public StreamDefinition getStreamDefinition(String streamId) {
+        return this.sdf.get(streamId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
new file mode 100755
index 0000000..cef94c7
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -0,0 +1,215 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+/**
+ * By default
+ * 1. one spout with multiple tasks
+ * 2. multiple router bolts with each bolt having exactly one task
+ * 3. multiple alert bolts with each bolt having exactly one task
+ * 4. one publish bolt with multiple tasks
+ */
+public class UnitTopologyRunner {
+    private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class);
+    public final static String spoutName = "alertEngineSpout";
+    private final static String streamRouterBoltNamePrefix = "streamRouterBolt";
+    private final static String alertBoltNamePrefix = "alertBolt";
+    public final static String alertPublishBoltName = "alertPublishBolt";
+
+    public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
+    public final static String ALERT_TASK_NUM = "topology.numOfAlertBolts";
+    public final static String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
+    public final static String LOCAL_MODE = "topology.localMode";
+    public final static String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
+    public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
+
+    private final IMetadataChangeNotifyService metadataChangeNotifyService;
+
+    public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
+        this.metadataChangeNotifyService = metadataChangeNotifyService;
+    }
+
+    public StormTopology buildTopology(String topologyId,
+                              int numOfSpoutTasks,
+                              int numOfRouterBolts,
+                              int numOfAlertBolts,
+                              int numOfPublishTasks,
+                              Config config) {
+
+        StreamRouterImpl[] routers = new StreamRouterImpl[numOfRouterBolts];
+        StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
+        PolicyGroupEvaluatorImpl[] evaluators = new PolicyGroupEvaluatorImpl[numOfAlertBolts];
+        AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
+        AlertPublisherImpl publisher;
+        AlertPublisherBolt publisherBolt;
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+
+        // construct Spout object
+        CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix);
+        builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
+
+        // construct StreamRouterBolt objects
+        for(int i=0; i<numOfRouterBolts; i++){
+            routers[i] = new StreamRouterImpl(streamRouterBoltNamePrefix + i);
+            routerBolts[i] = new StreamRouterBolt(routers[i], config, getMetadataChangeNotifyService());
+        }
+
+        // construct AlertBolt objects
+        for(int i=0; i<numOfAlertBolts; i++){
+            evaluators[i] = new PolicyGroupEvaluatorImpl(alertBoltNamePrefix + i);
+            alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, evaluators[i], config, getMetadataChangeNotifyService());
+        }
+
+        // construct AlertPublishBolt object
+        publisher = new AlertPublisherImpl(alertPublishBoltName);
+        publisherBolt = new AlertPublisherBolt(publisher, config, getMetadataChangeNotifyService());
+
+        // connect spout and router bolt, also define output streams for downstreaming alert bolt
+        for(int i=0; i<numOfRouterBolts; i++){
+            String boltName = streamRouterBoltNamePrefix + i;
+
+            // define output streams, which are based on
+            String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName);
+            List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts);
+            for(int j=0; j<numOfAlertBolts; j++){
+                String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix+j);
+                outputStreamIds.add(sid);
+            }
+            routerBolts[i].declareOutputStreams(outputStreamIds);
+
+            /**
+             * TODO potentially one route bolt may have multiple tasks, so that is field grouping by groupby fields
+             * that means we need a separate field to become groupby field
+             */
+            builder.setBolt(boltName, routerBolts[i]).fieldsGrouping(spoutName, streamId, new Fields());
+        }
+
+        // connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt
+        for(int i=0; i<numOfAlertBolts; i++){
+            String boltName = alertBoltNamePrefix + i;
+            BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]);
+            for(int j=0; j<numOfRouterBolts; j++) {
+                String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix+j, boltName);
+                boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix+j, streamId, new Fields());
+            }
+        }
+
+        // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
+        BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks);
+        for(int i=0; i<numOfAlertBolts; i++) {
+            boltDeclarer.fieldsGrouping(alertBoltNamePrefix+i, new Fields(AlertConstants.FIELD_0));
+        }
+
+        return builder.createTopology();
+    }
+
+    public void run(String topologyId,
+                    int numOfTotalWorkers,
+                    int numOfSpoutTasks,
+                    int numOfRouterBolts,
+                    int numOfAlertBolts,
+                    int numOfPublishTasks,
+                    Config config,
+                    boolean localMode) {
+        backtype.storm.Config stormConfig = new backtype.storm.Config();
+        // TODO: Configurable metric consumer instance number
+
+        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
+        LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
+        stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
+
+        if(config.hasPath("metric")) {
+            stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()),1);
+        }
+
+        stormConfig.setNumWorkers(numOfTotalWorkers);
+        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+
+        if(localMode) {
+            LOG.info("Submitting as local mode");
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology(topologyId, stormConfig, topology);
+            Utils.sleep(Long.MAX_VALUE);
+        }else{
+            LOG.info("Submitting as cluster mode");
+            try {
+                StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
+            } catch(Exception ex) {
+                LOG.error("fail submitting topology {}", topology, ex);
+                throw new IllegalStateException(ex);
+            }
+        }
+    }
+
+    public void run(Config config) {
+        String topologyId = config.getString("topology.name");
+        run(topologyId,config);
+    }
+
+    public void run(String topologyId,Config config) {
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
+        int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
+        boolean localMode = config.getBoolean(LOCAL_MODE);
+        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
+        run(topologyId,numOfTotalWorkers, numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config, localMode);
+    }
+
+    public StormTopology buildTopology(String topologyId,Config config) {
+        int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
+        int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
+        int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
+        int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
+        return buildTopology(topologyId,numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config);
+    }
+
+    public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
+        return metadataChangeNotifyService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
new file mode 100644
index 0000000..a3487d3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Expects flat Json scheme
+ */
+public class JsonScheme implements Scheme {
+    private static final long serialVersionUID = -8352896475656975577L;
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonScheme.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    private String topic;
+
+    public JsonScheme(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("f1");
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<Object> deserialize(byte[] ser) {
+        try {
+            if(ser != null ) {
+                Map map = mapper.readValue(ser, Map.class);
+                return Arrays.asList(topic, map);
+            }else{
+                if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore");
+            }
+        } catch (IOException e) {
+            try {
+                LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
+            }catch(Exception ex){
+                LOG.error(ex.getMessage(), ex);
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
new file mode 100644
index 0000000..1182e3f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
@@ -0,0 +1,74 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.coordination.model.StreamNameSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A strategy to get stream name from message tuple.
+ * 
+ * Since 5/5/16.
+ */
+public class JsonStringStreamNameSelector implements StreamNameSelector {
+    private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
+    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    private final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
+    private final static String STREAM_NAME_FORMAT = "streamNameFormat";
+
+    private String userProvidedStreamName;
+    private String[] fieldNamesToInferStreamName;
+    private String streamNameFormat;
+
+    public JsonStringStreamNameSelector(Properties prop) {
+        userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
+        String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY);
+        if (fields != null) {
+            fieldNamesToInferStreamName = fields.split(",");
+        }
+        streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT);
+        if (streamNameFormat == null) {
+            LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration.");
+        }
+    }
+
+    @Override
+    public String getStreamName(Map<String, Object> tuple) {
+        if (userProvidedStreamName != null) {
+            return userProvidedStreamName;
+        } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) {
+            Object[] args = new Object[fieldNamesToInferStreamName.length];
+            for (int i = 0; i < fieldNamesToInferStreamName.length; i++) {
+                Object colValue = tuple.get(fieldNamesToInferStreamName[i]);
+                args[i] = colValue;
+            }
+            return String.format(streamNameFormat, args);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!");
+        }
+        return "defaultStringStream";
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
new file mode 100644
index 0000000..89d2e76
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -0,0 +1,67 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+/**
+ * used for parsing plain string
+ */
+public class PlainStringScheme implements Scheme {
+    private static final long serialVersionUID = 5969724968671646310L;
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
+    private String topic;
+
+    public PlainStringScheme(String topic){
+        this.topic = topic;
+    }
+
+    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public static String deserializeString(byte[] buff) {
+        return new String(buff, UTF8_CHARSET);
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public List<Object> deserialize(byte[] ser) {
+        Map m = new HashMap<>();
+        m.put("value", deserializeString(ser));
+        m.put("timestamp", System.currentTimeMillis());
+        return new Values(topic, m);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
new file mode 100644
index 0000000..61ec943
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
@@ -0,0 +1,49 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.coordination.model.StreamNameSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 5/3/16.
+ */
+public class PlainStringStreamNameSelector implements StreamNameSelector {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
+    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
+
+    private String streamName;
+
+    public PlainStringStreamNameSelector(Properties prop){
+        streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
+        if(streamName == null)
+            streamName = DEFAULT_STRING_STREAM_NAME;
+    }
+    @Override
+    public String getStreamName(Map<String, Object> tuple) {
+        return streamName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
new file mode 100644
index 0000000..5ba1080
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TODO: Seams the complexity dosen't bring enough performance improve
+ *
+ * @see PartitionedEvent
+ */
+@Deprecated
+public class PartitionedEventDigestSerializer implements Serializer<PartitionedEvent> {
+    private final Serializer<StreamEvent> streamEventSerializer;
+    private final Serializer<StreamPartition> streamPartitionSerializer;
+
+    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
+    }
+
+    @Override
+    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(entity.getPartitionKey());
+        streamEventSerializer.serialize(entity.getEvent(),dataOutput);
+        streamPartitionSerializer.serialize(entity.getPartition(),dataOutput);
+    }
+
+    @Override
+    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(dataInput.readLong());
+        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
+        event.setEvent(streamEvent);
+        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
+        partition.setStreamId(streamEvent.getStreamId());
+        event.setPartition(partition);
+        return event;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
new file mode 100644
index 0000000..c518e40
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+import java.io.IOException;
+
+public interface PartitionedEventSerializer {
+    /**
+     *
+     * @param entity
+     * @return
+     * @throws IOException
+     */
+    byte[] serialize(PartitionedEvent entity) throws IOException;
+
+    /**
+     *
+     * @param bytes
+     * @return
+     * @throws IOException
+     */
+    PartitionedEvent deserialize(byte[] bytes) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
new file mode 100644
index 0000000..71b274d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Integration interface to provide stream definition for serializer
+ */
+public interface SerializationMetadataProvider {
+    /**
+     * @param streamId
+     * @return StreamDefinition or null if not exist
+     */
+    StreamDefinition getStreamDefinition(String streamId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
new file mode 100644
index 0000000..c2f87d0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface Serializer<V> {
+    void serialize(V value,DataOutput dataOutput) throws IOException;
+    V deserialize(DataInput dataInput) throws IOException;
+}
\ No newline at end of file


Mime
View raw message