eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: EAGLE-464 : init draft impelmentation of multiple staged state check
Date Tue, 16 Aug 2016 09:12:25 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop d3a7e480a -> 58f66a857


EAGLE-464 : init draft impelmentation of multiple staged state check

Init draft to provide alert on state, add a state definition in policy metadata.

Move input/output stream into definition
Assume the intermediate stream defined in system stream definition
Chain the siddhi definition, use index to identify the next handler.

Author: ralphsu
Reviewer: yonzhang

This closes #337


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/58f66a85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/58f66a85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/58f66a85

Branch: refs/heads/develop
Commit: 58f66a857f12ab0676b5e091dc5cdeda0904fca9
Parents: d3a7e48
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Tue Aug 16 17:12:05 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Tue Aug 16 17:12:05 2016 +0800

----------------------------------------------------------------------
 .../engine/coordinator/PolicyDefinition.java    |  45 +++++-
 .../evaluator/CompositePolicyHandler.java       |  95 ++++++++++++
 .../engine/evaluator/PolicyStreamHandlers.java  |  27 +++-
 ...ertBoltOutputCollectorThreadSafeWrapper.java |  18 ++-
 .../evaluator/impl/AlertStreamCallback.java     |  94 ++++++++++++
 .../impl/PolicyGroupEvaluatorImpl.java          |  28 ++--
 .../evaluator/impl/SiddhiPolicyHandler.java     |  99 ++++++------
 .../impl/SiddhiPolicyStateHandler.java          |  59 ++++++++
 .../engine/publisher/AlertDeduplicator.java     |   1 +
 .../SiddhiCEPPolicyEventHandlerTest.java        |   8 +-
 .../alert/engine/router/TestAlertBolt.java      |   9 +-
 .../alert/engine/siddhi/SiddhiPolicyTest.java   |  14 +-
 .../engine/statecheck/TestStateCheckPolicy.java | 151 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |   2 +-
 .../statecheck/application-statecheck.conf      |  60 ++++++++
 .../test/resources/statecheck/datasources.json  |  19 +++
 .../src/test/resources/statecheck/policies.json |  38 +++++
 .../test/resources/statecheck/publishments.json |  28 ++++
 .../resources/statecheck/streamdefinitions.json |  98 ++++++++++++
 .../test/resources/statecheck/topologies.json   |  31 ++++
 20 files changed, 818 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 95de4c9..37e17a7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -16,12 +16,14 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.util.Objects;
 
 /**
  * @since Apr 5, 2016
@@ -36,6 +38,7 @@ public class PolicyDefinition implements Serializable{
     private List<String> outputStreams = new ArrayList<String>();
 
     private Definition definition;
+    private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
 
     // one stream only have one partition in one policy, since we don't support stream alias
@@ -80,6 +83,14 @@ public class PolicyDefinition implements Serializable{
         return definition;
     }
 
+    public Definition getStateDefinition() {
+        return stateDefinition;
+    }
+
+    public void setStateDefinition(Definition stateDefinition) {
+        this.stateDefinition = stateDefinition;
+    }
+
     public void setDefinition(Definition definition) {
         this.definition = definition;
     }
@@ -137,6 +148,7 @@ public class PolicyDefinition implements Serializable{
                 CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
                 CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
                 another.definition.equals(this.definition) &&
+                Objects.equals(this.definition, another.definition) &&
                 CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) 
 //                && another.parallelismHint == this.parallelismHint
                 ) {
@@ -151,6 +163,9 @@ public class PolicyDefinition implements Serializable{
         public String type;
         public String value;
 
+        private List<String> inputStreams = new ArrayList<String>();
+        private List<String> outputStreams = new ArrayList<String>();
+
         public Definition(String type,String value){
             this.type = type;
             this.value = value;
@@ -173,8 +188,10 @@ public class PolicyDefinition implements Serializable{
             if(!(that instanceof Definition))
                 return false;
             Definition another = (Definition)that;
-            if(another.type.equals(this.type) &&
-                    another.value.equals(this.value))
+            if(another.type.equals(this.type)
+                    && another.value.equals(this.value)
+                    && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
+                    && ListUtils.isEqualList(another.outputStreams, this.outputStreams))
                 return true;
             return false;
         }
@@ -195,9 +212,25 @@ public class PolicyDefinition implements Serializable{
             this.value = value;
         }
 
+        public void setInputStreams(List<String> inputStreams) {
+            this.inputStreams = inputStreams;
+        }
+
+        public void setOutputStreams(List<String> outputStreams) {
+            this.outputStreams = outputStreams;
+        }
+
+        public List<String> getInputStreams() {
+            return inputStreams;
+        }
+
+        public List<String> getOutputStreams() {
+            return outputStreams;
+        }
+
         @Override
         public String toString() {
-            return String.format("{type=\"%s\",value=\"%s\"",type,value);
+            return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
new file mode 100644
index 0000000..047ee6f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 7/27/16.
+ */
+public class CompositePolicyHandler implements PolicyStreamHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(CompositePolicyHandler.class);
+
+    private PolicyStreamHandler policyHandler;
+    private PolicyStreamHandler stateHandler;
+    private List<PolicyStreamHandler> handlers = new ArrayList<>();
+
+    private Collector<AlertStreamEvent> collector;
+
+    private Map<String, StreamDefinition> sds;
+
+    public CompositePolicyHandler(Map<String, StreamDefinition> sds) {
+        this.sds = sds;
+    }
+
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+        // TODO: create two handlers
+        policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition().type, sds);
+        policyHandler.prepare(collector, context);
+        handlers.add(policyHandler);
+
+        if (context.getPolicyDefinition().getStateDefinition() != null) {
+            stateHandler = PolicyStreamHandlers.createStateHandler(context.getPolicyDefinition().getStateDefinition().type, sds);
+            stateHandler.prepare(collector, context);
+            handlers.add(stateHandler);
+        }
+    }
+
+    @Override
+    public void send(StreamEvent event) throws Exception {
+//        policyHandler.send(event);
+        send(event, 0);
+    }
+
+    // send event to index of stream handler
+    public void send(StreamEvent event, int idx) throws Exception {
+        if (handlers.size() > idx) {
+            handlers.get(idx).send(event);
+        } else if (event instanceof AlertStreamEvent) {
+            if (LOG.isDebugEnabled())
+                LOG.debug("Emit new alert event: {}", event);
+            collector.emit((AlertStreamEvent) event); // for alert stream events, emit if no handler found.
+        } else {
+            // nothing found. LOG, and throw exception
+            LOG.error("non-alert-stream-event {} send with index {}, but the handler is not found!", event, idx);
+            throw new Exception(String.format("event %s send with idx %d can not found expecting handler!", event, idx));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        for (PolicyStreamHandler handler : handlers) {
+            try {
+                handler.close();
+            } catch (Exception e) {
+                LOG.error("close handler {} failed, continue to run.", handler);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index fec8ff6..93327b7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -19,23 +19,34 @@ package org.apache.eagle.alert.engine.evaluator;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
 import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
 
 import java.util.Map;
 
+/**
+ * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
+ */
 public class PolicyStreamHandlers {
-    public static final String SIDDHI_ENGINE ="siddhi";
-    public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
-    public static final String ABSENCE_ALERT_ENGINE ="absencealert";
+    public static final String SIDDHI_ENGINE = "siddhi";
+    public static final String NO_DATA_ALERT_ENGINE = "nodataalert";
+    public static final String ABSENCE_ALERT_ENGINE = "absencealert";
 
-    public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
-        if(SIDDHI_ENGINE.equals(type)) {
-            return new SiddhiPolicyHandler(sds);
-        }else if(NO_DATA_ALERT_ENGINE.equals(type)){
+    public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds) {
+        if (SIDDHI_ENGINE.equals(type)) {
+            return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 
+        } else if (NO_DATA_ALERT_ENGINE.equals(type)) {
             return new NoDataPolicyHandler(sds);
-        }else if(ABSENCE_ALERT_ENGINE.equals(type)){
+        } else if (ABSENCE_ALERT_ENGINE.equals(type)) {
             return new AbsencePolicyHandler(sds);
         }
         throw new IllegalArgumentException("Illegal policy stream handler type " + type);
     }
+
+    public static PolicyStreamHandler createStateHandler(String type, Map<String, StreamDefinition> sds) {
+        if (SIDDHI_ENGINE.equals(type)) {
+            return new SiddhiPolicyStateHandler(sds, 1); //// FIXME: 8/2/16
+        }
+        throw new IllegalArgumentException("Illegal policy state handler type " + type);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
index 3aa079e..ca1f622 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
@@ -48,37 +48,39 @@ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCol
     private final AutoAlertFlusher flusher;
     private final static int MAX_ALERT_DELAY_SECS = 10;
 
-    public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector){
+    public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector) {
         this.delegate = outputCollector;
         this.queue = new LinkedBlockingQueue<>();
         this.flusher = new AutoAlertFlusher(this);
-        this.flusher.setName(Thread.currentThread().getName()+"-alertFlusher");
+        this.flusher.setName(Thread.currentThread().getName() + "-alertFlusher");
         this.flusher.start();
     }
 
-    private static class AutoAlertFlusher extends Thread{
+    private static class AutoAlertFlusher extends Thread {
         private final AlertBoltOutputCollectorThreadSafeWrapper collector;
         private boolean stopped = false;
         private final static Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class);
 
-        private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector){
+        private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector) {
             this.collector = collector;
         }
 
         @Override
         public void run() {
             LOG.info("Starting");
-            while(!this.stopped){
-                if(System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L){
+            while (!this.stopped) {
+                if (System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L) {
                     this.collector.flush();
                 }
                 try {
                     Thread.sleep(5000);
-                } catch (InterruptedException ignored) {}
+                } catch (InterruptedException ignored) {
+                }
             }
             LOG.info("Stopped");
         }
-        public void shutdown(){
+
+        public void shutdown() {
             LOG.info("Stopping");
             this.stopped = true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
new file mode 100644
index 0000000..50e3377
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * Created on 8/2/16.
+ */
+public class AlertStreamCallback extends StreamCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertStreamCallback.class);
+    private final String outputStream;
+    private final Collector<AlertStreamEvent> collector;
+    private final PolicyHandlerContext context;
+    private final StreamDefinition definition;
+
+    private int currentIndex;
+
+    public AlertStreamCallback(String outputStream,
+                               StreamDefinition streamDefinition,
+                               Collector<AlertStreamEvent> collector,
+                               PolicyHandlerContext context,
+                               int currentIndex) {
+        this.outputStream = outputStream;
+        this.collector = collector;
+        this.context = context;
+        this.definition = streamDefinition;
+        this.currentIndex = currentIndex;
+    }
+
+    /**
+     * Possibly more than one event will be triggered for alerting
+     *
+     * @param events
+     */
+    @Override
+    public void receive(Event[] events) {
+        String policyName = context.getPolicyDefinition().getName();
+        CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl)context.getPolicyEvaluator()).getPolicyHandler(policyName);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated {} alerts from policy '{}' in {}, index of definiton {} ", events.length, policyName, context.getPolicyEvaluatorId(), currentIndex);
+        }
+        for (Event e : events) {
+            AlertStreamEvent event = new AlertStreamEvent();
+            event.setTimestamp(e.getTimestamp());
+            event.setData(e.getData());
+            event.setStreamId(outputStream);
+            event.setPolicy(context.getPolicyDefinition());
+            if (this.context.getPolicyEvaluator() != null) {
+                event.setCreatedBy(context.getPolicyEvaluator().getName());
+            }
+            event.setCreatedTime(System.currentTimeMillis());
+            event.setSchema(definition);
+
+            if (LOG.isDebugEnabled())
+                LOG.debug("Generate new alert event: {}", event);
+            try {
+                if (handler == null) {
+                    // extreme case: the handler is removed from the evaluator. Just emit.
+                    if (LOG.isDebugEnabled()) LOG.debug(" handler not found when callback received event, directly emit. policy removed? ");
+                    collector.emit(event);
+                } else {
+                    handler.send(event, currentIndex + 1);
+                }
+            } catch (Exception ex) {
+                LOG.error(String.format("send event %s to index %d failed with exception. ",event, currentIndex), ex);
+            }
+        }
+        context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count")).incrBy(events.length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 228b7fb..068df90 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -20,10 +20,10 @@ import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
 import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
     // mapping from policy name to PolicyDefinition
     private volatile Map<String,PolicyDefinition> policyDefinitionMap = new HashMap<>();
     // mapping from policy name to PolicyStreamHandler
-    private volatile Map<String,PolicyStreamHandler> policyStreamHandlerMap = new HashMap<>();
+    private volatile Map<String,CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>();
     private String policyEvaluatorId;
     private StreamContext context;
 
@@ -82,7 +82,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
      */
     private void dispatch(PartitionedEvent partitionedEvent){
         boolean handled = false;
-        for(Map.Entry<String,PolicyStreamHandler> policyStreamHandler: policyStreamHandlerMap.entrySet()){
+        for(Map.Entry<String,CompositePolicyHandler> policyStreamHandler: policyStreamHandlerMap.entrySet()){
             if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){
                 try {
                     handled = true;
@@ -102,16 +102,16 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
         }
     }
 
-    private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy){
-        return policy.getInputStreams() != null
-                && policy.getInputStreams().contains(event.getEvent().getStreamId())
-                && policy.getPartitionSpec().contains(event.getPartition());
+    private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) {
+        return policy.getPartitionSpec().contains(event.getPartition()) &&
+                ( policy.getInputStreams().contains(event.getEvent().getStreamId()) ||
+                    policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()) );
     }
 
     @Override
     public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
         Map<String,PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
-        Map<String,PolicyStreamHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
+        Map<String,CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
         for(PolicyDefinition pd : added){
             inplaceAdd(copyPolicies, copyHandlers, pd, sds);
         }
@@ -131,12 +131,12 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
         this.policyStreamHandlerMap = copyHandlers;
     }
 
-    private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) {
+    private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) {
         if(handlers.containsKey(policy.getName())){
             LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy);
         }else {
             policies.put(policy.getName(), policy);
-            PolicyStreamHandler handler = PolicyStreamHandlers.createHandler(policy.getDefinition().type, sds);
+            CompositePolicyHandler handler = new CompositePolicyHandler(sds);
             try {
                 PolicyHandlerContext context = new PolicyHandlerContext();
                 context.setPolicyCounter(this.context.counter());
@@ -153,7 +153,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
         }
     }
 
-    private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy)  {
+    private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy)  {
         if(handlers.containsKey(policy.getName())) {
             PolicyStreamHandler handler = handlers.get(policy.getName());
             try {
@@ -169,4 +169,10 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
             LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: "+policy);
         }
     }
+
+
+    public CompositePolicyHandler getPolicyHandler(String policy) {
+        return policyStreamHandlerMap.get(policy);
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index b9d1eb2..481e3af 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -30,9 +28,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.ExecutionPlanRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.List;
+import java.util.Map;
 
 public class SiddhiPolicyHandler implements PolicyStreamHandler {
     private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
@@ -42,65 +41,36 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
     private PolicyDefinition policy;
     private PolicyHandlerContext context;
 
-    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds){
+    private int currentIndex = 0; // the index of current definition statement inside the policy definition
+
+    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds, int index){
         this.sds = sds;
+        this.currentIndex = index;
     }
 
-    private static String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException {
+    protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException {
         StringBuilder builder = new StringBuilder();
-        for(String inputStream:policyDefinition.getInputStreams()) {
-            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
-            builder.append("\n");
+        PolicyDefinition.Definition coreDefinition = policyDefinition.getDefinition();
+        // init if not present
+        if (coreDefinition.getInputStreams() == null || coreDefinition.getInputStreams().isEmpty()) {
+            coreDefinition.setInputStreams(policyDefinition.getInputStreams());
         }
-        builder.append(policyDefinition.getDefinition().value);
-        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: {} from policy: {}", builder.toString(),policyDefinition);
-        return builder.toString();
-    }
-
-    private static class AlertStreamCallback extends StreamCallback{
-        private final String outputStream;
-        private final Collector<AlertStreamEvent> collector;
-        private final PolicyHandlerContext context;
-        private final StreamDefinition definition;
-
-        public AlertStreamCallback(String outputStream, StreamDefinition streamDefinition, Collector<AlertStreamEvent> collector, PolicyHandlerContext context){
-            this.outputStream = outputStream;
-            this.collector = collector;
-            this.context = context;
-            this.definition = streamDefinition;
+        if (coreDefinition.getOutputStreams() == null || coreDefinition.getOutputStreams().isEmpty()) {
+            coreDefinition.setOutputStreams(policyDefinition.getOutputStreams());
         }
 
-        /**
-         * Possibly more than one event will be triggered for alerting
-         * @param events
-         */
-        @Override
-        public void receive(Event[] events) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
-            }
-            for(Event e : events) {
-                AlertStreamEvent event = new AlertStreamEvent();
-                event.setTimestamp(e.getTimestamp());
-                event.setData(e.getData());
-                event.setStreamId(outputStream);
-                event.setPolicy(context.getPolicyDefinition());
-                if (this.context.getPolicyEvaluator() != null) {
-                    event.setCreatedBy(context.getPolicyEvaluator().getName());
-                }
-                event.setCreatedTime(System.currentTimeMillis());
-                event.setSchema(definition);
-                if(LOG.isDebugEnabled())
-                    LOG.debug("Generate new alert event: {}", event);
-                collector.emit(event);
-            }
-            context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"alert_count")).incrBy(events.length);
+        for(String inputStream : coreDefinition.getInputStreams()) {
+            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
+            builder.append("\n");
         }
+        builder.append(coreDefinition.value);
+        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition);
+        return builder.toString();
     }
 
     @Override
     public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        LOG.info("Initializing handler for policy {}: {}",context.getPolicyDefinition(),this);
+        LOG.info("Initializing handler for policy {}",context.getPolicyDefinition());
         this.policy = context.getPolicyDefinition();
         this.siddhiManager = new SiddhiManager();
         String plan = generateExecutionPlan(policy, sds);
@@ -111,14 +81,17 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
             LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException);
             throw parserException;
         }
-        for(final String outputStream:policy.getOutputStreams()){
-            if(executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
+
+        // add output stream callback
+        List<String> outputStreams = getOutputStreams(policy);
+        for(final String outputStream: outputStreams) {
+            if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
                 this.executionRuntime.addCallback(outputStream,
                         new AlertStreamCallback(
-                        outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream))
-                        ,collector, context));
+                                outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream))
+                                , collector, context, currentIndex));
             } else {
-                throw new IllegalStateException("Undefined output stream "+outputStream);
+                throw new IllegalStateException("Undefined output stream " + outputStream);
             }
         }
         this.executionRuntime.start();
@@ -126,6 +99,10 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
         LOG.info("Initialized policy handler for policy: {}",policy.getName());
     }
 
+    protected List<String> getOutputStreams(PolicyDefinition policy) {
+        return policy.getOutputStreams().isEmpty() ? policy.getDefinition().getOutputStreams() : policy.getOutputStreams();
+    }
+
     public void send(StreamEvent event) throws Exception {
         context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr();
         String streamId = event.getStreamId();
@@ -143,7 +120,7 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
         }
     }
 
-    public void close() {
+    public void close() throws Exception {
         LOG.info("Closing handler for policy {}",this.policy.getName());
         this.executionRuntime.shutdown();
         LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName());
@@ -151,4 +128,12 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
         LOG.info("Shutdown siddhi manager {}",this.siddhiManager);
         LOG.info("Closed handler for policy {}",this.policy.getName());
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("SiddhiPolicyHandler for policy: ");
+        sb.append(this.policy == null ? "" : this.policy.getName());
+        return sb.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
new file mode 100644
index 0000000..43b8f30
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 7/27/16.
+ */
+public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler {
+
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
+
+    public SiddhiPolicyStateHandler(Map<String, StreamDefinition> sds, int index) {
+        super(sds, index);
+    }
+
+    @Override
+    protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException {
+        StringBuilder builder = new StringBuilder();
+        PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition();
+        for(String inputStream : stateDefiniton.getInputStreams()) { // the state stream follow the output stream of the policy definition
+            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
+            builder.append("\n");
+        }
+        builder.append(stateDefiniton.value);
+        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton);
+        return builder.toString();
+    }
+
+    @Override
+    protected List<String> getOutputStreams(PolicyDefinition policy) {
+        return policy.getStateDefinition().getOutputStreams();
+    }
+
+    // more validation on prepare
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
index b894cc0..ed04d5a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
@@ -21,6 +21,7 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 
 /**
  * Dedup Eagle entities.
+ * TODO: need support de-dup field values
  */
 public interface AlertDeduplicator {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
index ab19801..140b72d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
 import org.apache.eagle.alert.engine.mock.MockStreamCollector;
@@ -59,12 +60,13 @@ public class SiddhiCEPPolicyEventHandlerTest {
         SiddhiPolicyHandler handler;
         MockStreamCollector collector;
 
-        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1","sampleStream_2"));
+        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1","sampleStream_2"), 0);
         collector = new MockStreamCollector();
         PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
         PolicyHandlerContext context = new PolicyHandlerContext();
         context.setPolicyDefinition(policyDefinition);
         context.setPolicyCounter(new MultiCountMetric());
+        context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
         handler.prepare(collector,context);
         StreamEvent event = StreamEvent.Builder()
                 .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
@@ -105,12 +107,14 @@ public class SiddhiCEPPolicyEventHandlerTest {
             mutex.release();
         };
 
-        handler = new SiddhiPolicyHandler(ssd);
+        handler = new SiddhiPolicyHandler(ssd, 0);
         PolicyHandlerContext context = new PolicyHandlerContext();
         context.setPolicyDefinition(policyDefinition);
         context.setPolicyCounter(new MultiCountMetric());
+        context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
         handler.prepare(collector,context);
 
+
         long ts_1 = System.currentTimeMillis();
         long ts_2 = System.currentTimeMillis()+1;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 9e82cc7..9d2bb38 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -165,7 +165,7 @@ public class TestAlertBolt {
     }
 
     @NotNull
-    private AlertBolt createAlertBolt(OutputCollector collector) {
+    public static AlertBolt createAlertBolt(OutputCollector collector) {
         Config config = ConfigFactory.load();
         PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
         TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
@@ -244,10 +244,10 @@ public class TestAlertBolt {
             PolicyDefinition def = new PolicyDefinition();
             def.setName("policy-definition");
             def.setInputStreams(Arrays.asList(streamId));
-
+            def.setOutputStreams(Arrays.asList("output"));
             PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
             definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
-            definition.setValue("PT0M,plain,1,host,host1");
+            definition.setValue("PT0M,provided,1,host,host1");
             def.setDefinition(definition);
 
             boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
@@ -342,4 +342,5 @@ public class TestAlertBolt {
         return sp;
     }
 
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
index f4e82bc..cb60a46 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
@@ -16,15 +16,7 @@
  */
 package org.apache.eagle.alert.engine.siddhi;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.ExecutionPlanRuntime;
@@ -33,6 +25,10 @@ import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.core.stream.output.StreamCallback;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * @since Jun 21, 2016
  *

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
new file mode 100644
index 0000000..8a5d616
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.statecheck;
+
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.TestAlertBolt;
+import org.apache.eagle.alert.engine.runner.AlertBolt;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Created on 8/4/16.
+ */
+public class TestStateCheckPolicy {
+
+    @Test
+    public void testStateCheck() throws Exception {
+        PolicyGroupEvaluatorImpl impl = new PolicyGroupEvaluatorImpl("test-statecheck-poicyevaluator");
+        AtomicBoolean verified = new AtomicBoolean(false);
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                verified.set(true);
+                Assert.assertEquals("perfmon_latency_check_output2", tuple.get(0));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
+            @Override
+            public void ack(Tuple input) {
+            }
+
+            @Override
+            public void fail(Tuple input) {
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+            }
+        });
+
+        AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector);
+        AlertBoltSpec spec = createAlertSpec();
+        Map<String, StreamDefinition> definitionMap = createStreamMap();
+        alertBolt.onAlertBoltSpecChange(spec, definitionMap);
+
+        // send data now
+        sendData(alertBolt, definitionMap, spec.getBoltPoliciesMap().values().iterator().next().get(0));
+
+        Thread.sleep(3000);
+        Assert.assertTrue(verified.get());
+    }
+
+    private void sendData(AlertBolt alertBolt, Map<String, StreamDefinition> definitionMap, PolicyDefinition policyDefinition) {
+        StreamDefinition definition = definitionMap.get("perfmon_latency_stream");
+        long base = System.currentTimeMillis();
+        for (int i = 0; i < 2; i++) {
+            long time = base + i * 1000;
+
+            Map<String, Object> mapdata = new HashMap<>();
+            mapdata.put("host", "host-1");
+            mapdata.put("timestamp", time);
+            mapdata.put("metric", "perfmon_latency");
+            mapdata.put("pool", "raptor");
+            mapdata.put("value", 1000.0 + i * 1000.0);
+            mapdata.put("colo", "phx");
+
+            StreamEvent event = StreamEvent.Builder().timestamep(time).attributes(mapdata, definition).build();
+            PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1);
+
+            GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class);
+
+            Mockito.when(mock.getComponentId(1)).thenReturn("taskId");
+            Mockito.when(mock.getComponentOutputFields("taskId", "test-stream-id")).thenReturn(new Fields(AlertConstants.FIELD_0));
+
+            TupleImpl ti = new TupleImpl(mock, Collections.singletonList(pEvent), 1, "test-stream-id");
+            alertBolt.execute(ti);
+        }
+    }
+
+    @NotNull
+    private Map<String, StreamDefinition> createStreamMap() throws Exception {
+        List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
+                new TypeReference<List<StreamDefinition>>() {
+                });
+        return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item));
+    }
+
+    private static ObjectMapper mapper = new ObjectMapper();
+    static {
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    private AlertBoltSpec createAlertSpec() throws Exception {
+        AlertBoltSpec spec = new AlertBoltSpec();
+
+        spec.setVersion("version1");
+        spec.setTopologyName("testTopology");
+
+        List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
+                new TypeReference<List<PolicyDefinition>>() {
+                });
+        Assert.assertTrue(policies.size() > 0);
+        spec.addBoltPolicy("alertBolt1", policies.get(0).getName());
+        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies));
+
+        return spec;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
index 8025654..164fa8e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=DEBUG, stdout
 
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/application-statecheck.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/application-statecheck.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/application-statecheck.conf
new file mode 100644
index 0000000..73e5b30
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/application-statecheck.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+	"context" : "/rest",
+	"host" : "localhost",
+	"port" : 8080
+  },
+  "coordinatorService": {
+  	"host": "localhost",
+  	"port": "8080",
+  	"context" : "/rest"
+  },
+  "kafkaProducer": {
+  	"bootstrapServers": "localhost:9092"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/datasources.json
new file mode 100644
index 0000000..77a280c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/datasources.json
@@ -0,0 +1,19 @@
+[
+{
+	"name": "perfmon_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "perfmon_metrics",
+	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"fieldNamesToInferStreamName" : "metric",
+			"streamNameFormat":"%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": "timestamp",
+		"timestampFormat":""
+	}
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/policies.json
new file mode 100644
index 0000000..7680b9e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/policies.json
@@ -0,0 +1,38 @@
+[
+  {
+    "name": "state_check_latency",
+    "definition": {
+      "type": "siddhi",
+      "value": "from perfmon_latency_stream#window.time(5 min) select host, timestamp, metric, pool, value, colo, convert(avg(value)/1000, 'int') as latencyState group by host insert into perfmon_latency_check_output;",
+      "inputStreams": [
+        "perfmon_latency_stream"
+      ],
+      "outputStreams": [
+        "perfmon_latency_check_output"
+      ]
+    },
+    "stateDefinition": {
+      "type": "siddhi",
+      "value": "from every a = perfmon_latency_check_output -> b = perfmon_latency_check_output[host == a.host and timestamp > a.timestamp] within 5 min select a.latencyState as fromState, b.latencyState as toState, b.timestamp as happenTime insert into perfmon_latency_check_output2",
+      "value1": "from every a = perfmon_latency_check_output[latencyState >= 1] -> b = perfmon_latency_check_output[host == a.host and latencyState != a.latencyState and timestamp > a.timestamp] within 5 min select a.latencyState as fromState, b.latencyState as toState, b.timestamp as happenTime insert into perfmon_latency_check_output2",
+      "inputStreams": [
+        "perfmon_latency_check_output"
+      ],
+      "outputStreams": [
+        "perfmon_latency_check_output2"
+      ]
+    },
+    "partitionSpec": [
+      {
+        "streamId": "perfmon_latency_stream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT0M"
+        }
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/publishments.json
new file mode 100644
index 0000000..3e1910f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/publishments.json
@@ -0,0 +1,28 @@
+[
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name": "kafka-testAlertStream",
+    "policyIds": [
+      "state_check_latency"
+    ],
+    "dedupIntervalMin": "PT1M",
+    "properties": {
+      "kafka_broker": "localhost:9092",
+      "topic": "latency_state_kafka1"
+    },
+    "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  },
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name": "kafka-testAlertStream_state",
+    "policyIds": [
+      "state_check_latency"
+    ],
+    "dedupIntervalMin": "PT1M",
+    "properties": {
+      "kafka_broker": "localhost:9092",
+      "topic": "latency_state_kafka2"
+    },
+    "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/streamdefinitions.json
new file mode 100644
index 0000000..8f9f80e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/streamdefinitions.json
@@ -0,0 +1,98 @@
+[
+  {
+    "streamId": "perfmon_latency_stream",
+    "dataSource": "perfmon_datasource",
+    "description": "the data stream for perfmon latency metrics",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "host",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "metric",
+        "type": "STRING",
+        "defaultValue": "perfmon_latency",
+        "required": true
+      },
+      {
+        "name": "pool",
+        "type": "STRING",
+        "defaultValue": "raptor_general",
+        "required": true
+      },
+      {
+        "name": "value",
+        "type": "DOUBLE",
+        "defaultValue": 0.0,
+        "required": true
+      },
+      {
+        "name": "colo",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      }
+    ]
+  },
+  {
+    "streamId": "perfmon_latency_check_output",
+    "dataSource": "",
+    "description": "the data stream for perfmon latency state output",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "host",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "metric",
+        "type": "STRING",
+        "defaultValue": "perfmon_latency",
+        "required": true
+      },
+      {
+        "name": "pool",
+        "type": "STRING",
+        "defaultValue": "raptor_general",
+        "required": true
+      },
+      {
+        "name": "value",
+        "type": "DOUBLE",
+        "defaultValue": 0.0,
+        "required": true
+      },
+      {
+        "name": "colo",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "latencyState",
+        "type": "INT",
+        "defaultValue": 0,
+        "required": true
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58f66a85/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/topologies.json
new file mode 100644
index 0000000..411cc48
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/statecheck/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_1",
+	"numOfSpout":1,
+	"numOfAlertBolt": 10,
+	"numOfGroupBolt": 4,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]


Mime
View raw message