eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/2] incubator-eagle git commit: EAGLE-136 Setup hadoop metric application
Date Thu, 21 Jan 2016 16:01:49 GMT
EAGLE-136 Setup hadoop metric application

https://issues.apache.org/jira/browse/EAGLE-136

- Add eagle-hadoopo-metrics project
- Add NameNodeLagMonitor app
- Fix eagle core to remove eagleAlertContext injection
- Fix the failed JDBC unit test (by comment out)

Closes #68.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/13bb16cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/13bb16cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/13bb16cf

Branch: refs/heads/master
Commit: 13bb16cf44cb41ae0be235bd7f5fcfcf27c86bea
Parents: d00513c
Author: Hao Chen <hao@apache.org>
Authored: Fri Jan 22 00:00:05 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Jan 22 00:00:05 2016 +0800

----------------------------------------------------------------------
 .../siddhi/SiddhiAlertAPIEntityRender.java      |   8 +-
 .../eagle/alert/cep/TestSiddhiEvaluator.java    |  13 +-
 .../dao/TestSiddhiStreamMetadataUtils.java      |  15 +-
 .../eagle/alert/siddhi/TestSiddhiStream.java    |   2 +-
 .../aggregate/AggregateExecutorFactory.java     |   7 +-
 .../impl/aggregate/SimpleAggregateExecutor.java |  32 +-
 .../core/StreamAggregateExpansion.scala         |   2 +-
 .../eagle/datastream/core/StreamProducer.scala  |   7 +-
 .../eagle/datastream/core/StreamProtocol.scala  |   2 +-
 .../datastream/TestSimpleAggregateExecutor.java |  72 ++++
 .../src/test/resources/application.conf         |   2 +-
 .../apache/eagle/ml/MLAlgorithmEvaluator.java   |   5 +-
 .../org/apache/eagle/ml/MLPolicyEvaluator.java  |  35 +-
 .../apache/eagle/policy/common/Constants.java   |   5 +-
 .../policy/dao/AlertDefinitionDAOImpl.java      |  15 +-
 .../dao/PolicyDefinitionEntityDAOImpl.java      |   8 +-
 .../policy/executor/PolicyProcessExecutor.java  |  24 +-
 .../siddhi/SiddhiOutputStreamCallback.java      |  68 +++
 .../policy/siddhi/SiddhiPolicyEvaluator.java    | 424 ++++++++++---------
 .../policy/siddhi/SiddhiQueryCallbackImpl.java  |  41 +-
 .../siddhi/SiddhiStreamMetadataUtils.java       |   2 +-
 .../eagle/storage/jdbc/TestJdbcStorage.java     |   8 +-
 eagle-hadoop-metric/pom.xml                     |  38 ++
 .../metric/HadoopJmxMetricDeserializer.java     |  56 +++
 .../eagle/hadoop/metric/NameNodeLagMonitor.java |  67 +++
 .../src/main/resources/application.conf         |  86 ++++
 .../src/main/resources/log4j.properties         |  40 ++
 .../src/main/resources/namenodelag.yaml         |  18 +
 .../src/main/resources/namenodelage-init.sh     | 194 +++++++++
 .../metric/HadoopJmxMetricDeserializerTest.java |  40 ++
 .../hadoop/metric/TestHadoopMetricSiddhiQL.java | 241 +++++++++++
 .../persist/test/PersistTopoTestMain2.java      |   4 +-
 .../UserProfileMLAlgorithmEvaluator.java        |  13 +-
 pom.xml                                         |   2 +
 34 files changed, 1257 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
index a67bae5..254d84c 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
@@ -17,6 +17,9 @@
 package org.apache.eagle.alert.siddhi;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.common.metric.AlertContext;
@@ -24,9 +27,6 @@ import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.policy.ResultRender;
 import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.policy.common.UrlBuilder;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
 import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
 import org.apache.eagle.policy.siddhi.SiddhiQueryCallbackImpl;
 import org.apache.eagle.policy.siddhi.StreamMetadataManager;
@@ -69,7 +69,7 @@ public class SiddhiAlertAPIEntityRender implements ResultRender<AlertDefinitionA
 
 		for (int index = 0; index < rets.size(); index++) {
 			//attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
-			context.addProperty(attrRenameList.get(index + 1), rets.get(index));
+			context.addProperty(attrRenameList.get(index), rets.get(index));
 		}
 
 		StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index 3aad31b..b29e4c1 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -89,13 +89,11 @@ public class TestSiddhiEvaluator {
 			put("allowed", "true");
 		}};
         final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
-        policyDef.setType("SiddhiCEPEngine");
+        policyDef.setType("siddhiCEPEngine");
         String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
 							"select * " +
 							"insert into outputStream ;";
         policyDef.setExpression(expression);
-        SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = new SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
-		PolicyEvaluationContext context = new PolicyEvaluationContext();
 
 		PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
 			@Override
@@ -112,8 +110,9 @@ public class TestSiddhiEvaluator {
 		};
 		alertExecutor.prepareConfig(config);
 		alertExecutor.init();
+
+		PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> context = new PolicyEvaluationContext<>();
 		context.alertExecutor = alertExecutor;
-		context.evaluator = evaluator;
 		context.policyId = "testPolicy";
 		context.resultRender = new SiddhiAlertAPIEntityRender();
 		context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () {
@@ -122,7 +121,11 @@ public class TestSiddhiEvaluator {
 				alertCount++;
 			}
 		};
-		evaluator.evaluate(new ValuesArray(context, "hdfsAuditLogEventStream", data1));
+
+		SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator =
+				new SiddhiPolicyEvaluator<>(config, context, policyDef, new String[]{"hdfsAuditLogEventStream"}, false);
+
+		evaluator.evaluate(new ValuesArray(context.outputCollector, "hdfsAuditLogEventStream", data1));
 		Thread.sleep(2 * 1000);
 		Assert.assertEquals(alertCount, 1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
index fe45f80..1a30c71 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
@@ -16,23 +16,22 @@
  */
 package org.apache.eagle.alert.dao;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
-
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
 import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
 public class TestSiddhiStreamMetadataUtils {
 	@Test
-	public void test() throws Exception{
+	public void test() throws Exception {
         Config config = ConfigFactory.load();
         StreamMetadataManager.getInstance().reset();
         StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO() {
@@ -45,7 +44,7 @@ public class TestSiddhiStreamMetadataUtils {
             }
         });
 		String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
-		Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object, attrName1 string,attrName2 long);", siddhiStreamDef);
+		Assert.assertEquals("define stream " + "testStreamName" + "(attrName1 string,attrName2 long);", siddhiStreamDef);
 	}
 	
 	private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
index 1967265..0027bce 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
@@ -25,7 +25,7 @@ public class TestSiddhiStream {
 	
 	@Test
 	public void test() {
-		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,sensitivityType insert into outputStream;";
+		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,define stream hdfsAuditLogEventStream(eagleAlertContext object, allowed string,cmd string,dst string,host string,securityZone string,sensitivityType string,src string,timestamp long,user string); @info(name = 'query') from hdfsAuditLogEventStream[cmd=='open'] select * insert into outputStream ; insert into outputStream;";
 		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
 		
 		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
index d92cde8..5093685 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -44,12 +44,13 @@ public class AggregateExecutorFactory {
 	public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
 
 
-	public IPolicyExecutor[] createExecutors(String cql,List<String> upStreamNames) throws Exception {
+	public IPolicyExecutor[] createExecutors(List<String> streamNames, String cql) throws Exception {
 		int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
 
 		IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
+		String[] upStreams = streamNames.toArray(new String[0]);
 		for (int i = 0; i < numPartitions ; i++ ) {
-			executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions,upStreamNames);
+			executors[i] = new SimpleAggregateExecutor(upStreams, cql, "siddhiCEPEngine", i, numPartitions);
 		}
 
 		return executors;
@@ -113,4 +114,4 @@ public class AggregateExecutorFactory {
 		}
 		return alertExecutors;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index 6593b32..1d8eecc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -42,6 +42,8 @@ import java.util.List;
 import java.util.Map;
 
 /**
+ * Only one policy for one simple aggregate executor
+ *
  * Created on 1/10/16.
  */
 public class SimpleAggregateExecutor
@@ -53,25 +55,19 @@ public class SimpleAggregateExecutor
     private final String cql;
     private final int partitionSeq;
     private final int totalPartitionNum;
-    private final String[] sourceStreams;
 
+    private final String[] upStreamNames;
     private String policyId;
     private String executorId;
     private Config config;
     private AggregateDefinitionAPIEntity aggDef;
     private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
 
-    public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum,List<String> sourceStreams) {
+    public SimpleAggregateExecutor(String[] upStreams, String cql, String policyType, int partitionSeq, int totalPartitionNum) {
         this.cql = cql;
         this.partitionSeq = partitionSeq;
+        this.upStreamNames = upStreams;
         this.totalPartitionNum = totalPartitionNum;
-
-        if(sourceStreams == null){
-            this.sourceStreams = new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME};
-        }else{
-            this.sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
-        }
-
         // create an fixed definition policy api entity, and indicate it has full definition
         aggDef = new AggregateDefinitionAPIEntity();
         aggDef.setTags(new HashMap<String, String>());
@@ -132,11 +128,15 @@ public class SimpleAggregateExecutor
         }
 
         PolicyEvaluator<AggregateDefinitionAPIEntity> pe;
+        PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context = new PolicyEvaluationContext<>();
+        context.policyId = alertDef.getTags().get("policyId");
+        context.alertExecutor = this;
+        context.resultRender = new AggregateResultRender();
         try {
-            // Create evaluator instances
+            // create evaluator instances
             pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
-                    .getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
-                    .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, sourceStreams, false);
+                    .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
+                    .newInstance(config, context, policyDef, upStreamNames, false);
         } catch (Exception ex) {
             LOG.error("Fail creating new policyEvaluator", ex);
             LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
@@ -153,13 +153,7 @@ public class SimpleAggregateExecutor
         if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + evaluator);
 
         try {
-            PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> evaluationContext = new PolicyEvaluationContext<>();
-            evaluationContext.alertExecutor = this;
-            evaluationContext.policyId = policyId;
-            evaluationContext.evaluator = evaluator;
-            evaluationContext.outputCollector = collector;
-            evaluationContext.resultRender = new AggregateResultRender();
-            evaluator.evaluate(new ValuesArray(evaluationContext, input.get(1), input.get(2)));
+            evaluator.evaluate(new ValuesArray(collector, input.get(1), input.get(2)));
         } catch (Exception ex) {
             LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
index c5acab6..9564a0d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -43,7 +43,7 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
         val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
 
         val analyzeExecutors = if (cepQl != null) {
-          AggregateExecutorFactory.Instance.createExecutors(cepQl,upStreamNames)
+          AggregateExecutorFactory.Instance.createExecutors(upStreamNames, cepQl)
         } else {
           AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 18acd9c..6445643 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -24,7 +24,6 @@ import com.typesafe.config.Config
 import org.apache.eagle.alert.entity.AlertAPIEntity
 import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper}
 import org.apache.eagle.partition.PartitionStrategy
-import org.apache.eagle.policy.common.Constants
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 import org.slf4j.LoggerFactory
 
@@ -213,12 +212,12 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     ret
   }
 
-  def aggregate(cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
-    val ret= AggregateProducer(util.Arrays.asList(Constants.EAGLE_DEFAULT_POLICY_NAME), null, cql, strategy)
+  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
+    val ret= AggregateProducer(upStreamNames, null, cql, strategy)
     connect(this, ret)
     ret
   }
-  
+
   def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
     val ret = PersistProducer(executorId, storageType)
     connect(this, ret)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
index 1330f06..b54b21f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -159,7 +159,7 @@ trait StreamProtocol[+T <: Any]{
 
   def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
 
-  def aggregate(cql : String, strategy:PartitionStrategy): StreamProducer[T]
+  def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy:PartitionStrategy): StreamProducer[T]
 
   def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T]
   

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
new file mode 100644
index 0000000..974cd19
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.dataproc.impl.aggregate.SimpleAggregateExecutor;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created on 1/20/16.
+ */
+public class TestSimpleAggregateExecutor {
+
+    @Test
+    public void test() throws Exception {
+        SimpleAggregateExecutor sae = new SimpleAggregateExecutor(new String[]{"s1"},
+                "define stream s1(eagleAlertContext object, timestamp long, metric string);" +
+                        " @info(name='query')" +
+                        " from s1 select * insert into tmp;"
+                ,
+                "siddhiCEPEngine",
+                0,
+                1);
+
+        Config config = ConfigFactory.empty();
+        sae.prepareConfig(config);
+        sae.init();
+
+        List<Object> tuple = new ArrayList<>(3);
+        tuple.add(0, "groupbykey");
+        tuple.add(1, "s1");
+        SortedMap value = new TreeMap();
+        value.put("timestamp", System.currentTimeMillis());
+        value.put("metric", "name-of-the-metric");
+        tuple.add(2, value);
+
+        final AtomicInteger count = new AtomicInteger();
+        sae.flatMap(tuple, new Collector<Tuple2<String, AggregateEntity>>(){
+            @Override
+            public void collect(Tuple2<String, AggregateEntity> stringAggregateEntityTuple2) {
+                System.out.print(stringAggregateEntityTuple2.f0());
+                count.incrementAndGet();
+            }
+        });
+
+        Thread.sleep(3000);
+        Assert.assertEquals(1, count.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
index 7b80481..17dc471 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
@@ -23,7 +23,7 @@
 		}
 	},
 	"dataSourceConfig": {
-		"topic" : "sample_event",
+		"topic" : "nn_jmx_metric_sandbox",
 		"zkConnection" : "localhost:2181",
 		"zkConnectionTimeoutMS" : 15000,
 		"consumerGroupId" : "EagleConsumer",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
index 4ca5efe..800b6e0 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
@@ -16,9 +16,10 @@
  */
 package org.apache.eagle.ml;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.ml.model.MLAlgorithm;
-import com.typesafe.config.Config;
+import org.apache.eagle.policy.PolicyEvaluationContext;
 
 /**
  * Machine Learning Algorithm Evaluator
@@ -29,7 +30,7 @@ public interface MLAlgorithmEvaluator {
      *
      * @param algorithm MLAlgorithm instance
      */
-    public void init(MLAlgorithm algorithm, Config config);
+    public void init(MLAlgorithm algorithm, Config config, PolicyEvaluationContext context);
 
     /**
      * Evaluate input user profile model

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
index ff88a7a..52b8e25 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
@@ -16,35 +16,32 @@
  */
 package org.apache.eagle.ml;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
+import com.typesafe.config.Config;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.ml.impl.MLAnomalyCallbackImpl;
 import org.apache.eagle.ml.model.MLAlgorithm;
 import org.apache.eagle.ml.model.MLPolicyDefinition;
 import org.apache.eagle.ml.utils.MLReflectionUtils;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.*;
 
 public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEntity> {
 	private static Logger LOG = LoggerFactory.getLogger(MLPolicyEvaluator.class);
     private volatile MLRuntime mlRuntime;
-	private String policyName;
 	private Config config;
 	private Map<String,String> context;
+	private final PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext;
 
 	private class MLRuntime{
 		MLPolicyDefinition mlPolicyDef;
@@ -52,8 +49,8 @@ public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEnti
 		List<MLAnomalyCallback> mlAnomalyCallbacks = new ArrayList<>();
 	}
 
-	public MLPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
-		this(config, policyName, policyDef, sourceStreams, false);
+	public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams){
+		this(config, evalContext, policyDef, sourceStreams, false);
 	}
 
 	/**
@@ -62,10 +59,10 @@ public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEnti
 	 * @param sourceStreams
 	 * @param needValidation
 	 */
-	public MLPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
+	public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
 		this.config = config;
-        this.policyName = policyName;
-        LOG.info("Initializing policy named: "+policyName);
+		this.evalContext = evalContext;
+        LOG.info("Initializing policy named: " + evalContext.policyId);
         this.context = new HashMap<>();
         this.context.put(Constants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
 		this.init(policyDef);
@@ -98,7 +95,7 @@ public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEnti
 			int i = 0; 
 			for(MLAlgorithm algorithm:mlAlgorithms){
                 MLAlgorithmEvaluator mlAlgorithmEvaluator = MLReflectionUtils.newMLAlgorithmEvaluator(algorithm);
-                mlAlgorithmEvaluator.init(algorithm,config);
+                mlAlgorithmEvaluator.init(algorithm, config, evalContext);
 				runtime.mlAlgorithmEvaluators[i] =  mlAlgorithmEvaluator;
 				LOG.info("mlAlgorithmEvaluator: " + mlAlgorithmEvaluator.toString());
 						mlAlgorithmEvaluator.register(callbackImpl);
@@ -153,7 +150,7 @@ public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEnti
 	}
 
     public String getPolicyName() {
-		return policyName;
+		return evalContext.policyId;
 	}
 	
 	public Map<String, String> getAdditionalContext() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
index 0499431..c5d8fea 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -38,7 +38,7 @@ public class Constants {
 	public static final String ALERT_SOURCE = "alertSource";
 	public static final String ALERT_MESSAGE = "alertMessage";
 	public static final String SUBJECT = "subject";
-	public static final String ALERT_EXECUTOR_ID = PolicyDefinitionEntityDAOImpl.ALERT_EXECUTOR_ID;// "alertExecutorId";
+	public static final String ALERT_EXECUTOR_ID = "alertExecutorId";
 	public static final String POLICY_NAME = "policyName";
 	public static final String POLICY_ID = "policyId";
     public static final String SOURCE_STREAMS = "sourceStreams";
@@ -56,8 +56,7 @@ public class Constants {
 	public static final String PARTITIONER = "partitioner";
 	public static final String SOURCE = "source";
 	public static final String PARTITIONSEQ = "partitionSeq";
-	// policy definition status
-	public static final String EAGLE_DEFAULT_POLICY_NAME = "eagleQuery";
+	public static final String EXECUTOR_ID = "executorId";
 
 	public enum policyType {
 		siddhiCEPEngine,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
index 39c5284..8694f52 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
@@ -16,20 +16,20 @@
  */
 package org.apache.eagle.policy.dao;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Utility methods to load alert definitions for a program
  */
@@ -74,7 +74,8 @@ public class AlertDefinitionDAOImpl implements PolicyDefinitionDAO<AlertDefiniti
 		List<AlertDefinitionAPIEntity> list = findActivePolicies(site, dataSource);
 		Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
 			for (AlertDefinitionAPIEntity entity : list) {
-				String executorID = entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
+				String executorID = entity.getTags().containsKey(Constants.EXECUTOR_ID) ? entity.getTags().get(Constants.EXECUTOR_ID)
+						: entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
 				if (map.get(executorID) == null) {
 					map.put(executorID, new HashMap<String, AlertDefinitionAPIEntity>());
 				}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
index c1bd24e..ccc6c2b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
@@ -16,8 +16,9 @@
  */
 package org.apache.eagle.policy.dao;
 
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -38,7 +39,6 @@ public class PolicyDefinitionEntityDAOImpl<T extends AbstractPolicyDefinitionEnt
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(PolicyDefinitionEntityDAOImpl.class);
-	public static final String ALERT_EXECUTOR_ID = "alertExecutorId";
 	private final EagleServiceConnector connector;
 	private final String servicePointName;
 
@@ -82,8 +82,8 @@ public class PolicyDefinitionEntityDAOImpl<T extends AbstractPolicyDefinitionEnt
 		Map<String, Map<String, T>> map = new HashMap<String, Map<String, T>>();
 		for (T entity : list) {
 			// support both executorId and legacy alertExecutorId
-			String executorID = entity.getTags().containsKey("executorId") ? entity.getTags().get("executorId")
-					: entity.getTags().get(ALERT_EXECUTOR_ID);
+			String executorID = entity.getTags().containsKey(Constants.EXECUTOR_ID) ? entity.getTags().get(Constants.EXECUTOR_ID)
+					: entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
 
 			if (map.get(executorID) == null) {
 				map.put(executorID, new HashMap<String, T>());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
index fdbcda0..4405f0f 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
@@ -234,12 +234,16 @@ public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEn
 			LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
 		}
 		PolicyEvaluator<T> pe;
+		PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
+		context.policyId = alertDef.getTags().get("policyId");
+		context.alertExecutor = this;
+		context.resultRender = this.getResultRender();
 		try {
-			// Create evaluator instances
+			// create evaluator instance
 			pe = (PolicyEvaluator<T>) evalCls
-					.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
-					.newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation);
-		}catch(Exception ex){
+					.getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
+					.newInstance(config, context, policyDef, sourceStreams, needValidation);
+		} catch(Exception ex) {
 			LOG.error("Fail creating new policyEvaluator", ex);
 			LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
 			throw new IllegalStateException(ex);
@@ -268,10 +272,6 @@ public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEn
 			return true;
 		return false;
 	}
-	
-	private long trim(long value, long granularity) {
-		return value / granularity * granularity;
-	}
 
 	private void updateCounter(String name, Map<String, String> dimensions, double value) {
 		long current = System.currentTimeMillis();
@@ -323,13 +323,7 @@ public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEn
                     PolicyEvaluator<T> evaluator = entry.getValue();
                     updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
                     try {
-                        PolicyEvaluationContext<T, K> evaluationContext = new PolicyEvaluationContext<T, K>();
-                        evaluationContext.alertExecutor = this;
-                        evaluationContext.policyId = policyId;
-                        evaluationContext.evaluator = evaluator;
-                        evaluationContext.outputCollector = outputCollector;
-						evaluationContext.resultRender = getResultRender();
-                        evaluator.evaluate(new ValuesArray(evaluationContext, input.get(1), input.get(2)));
+                        evaluator.evaluate(new ValuesArray(outputCollector, input.get(1), input.get(2)));
                     }
                     catch (Exception ex) {
                         LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
new file mode 100644
index 0000000..22506aa
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Siddhi stream call back
+ *
+ * Created on 1/20/16.
+ */
+public class SiddhiOutputStreamCallback<T extends AbstractPolicyDefinitionEntity, K> extends StreamCallback {
+
+    public static final Logger LOG = LoggerFactory.getLogger(SiddhiOutputStreamCallback.class);
+
+    private SiddhiPolicyEvaluator<T, K> evaluator;
+    public Config config;
+
+    public SiddhiOutputStreamCallback(Config config, SiddhiPolicyEvaluator<T, K> evaluator) {
+        this.config = config;
+        this.evaluator = evaluator;
+    }
+
+    @Override
+    public void receive(Event[] events) {
+        long timeStamp = System.currentTimeMillis();
+        List<K> alerts = new LinkedList<>();
+        PolicyEvaluationContext<T, K> siddhiContext =  null;
+
+        for (Event event : events) {
+            Object[] data = event.getData();
+            List<Object> returns = SiddhiQueryCallbackImpl.getOutputObject(event.getData());
+            K alert = siddhiContext.resultRender.render(config, returns, siddhiContext, timeStamp);
+            alerts.add(alert);
+
+            if (siddhiContext == null) {
+                siddhiContext = (PolicyEvaluationContext<T, K>) data[0];
+            }
+        }
+
+        if (siddhiContext != null) {
+            siddhiContext.alertExecutor.onEvalEvents(siddhiContext, alerts);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index 69d65d5..a4ab081 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -21,6 +21,8 @@ import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.policy.PolicyEvaluator;
 import org.apache.eagle.policy.PolicyManager;
 import org.apache.eagle.policy.common.Constants;
@@ -33,7 +35,6 @@ import org.wso2.siddhi.core.query.output.callback.QueryCallback;
 import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.query.api.execution.query.Query;
 import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
 
 import java.lang.reflect.Field;
 import java.util.*;
@@ -42,243 +43,248 @@ import java.util.*;
  * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources.
  * during this time, synchronization is important
  */
-public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T>{
+public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T> {
 
-	private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
+    private final static String EXECUTION_PLAN_NAME = "query";
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
 
-	private volatile SiddhiRuntime siddhiRuntime;
-	private String[] sourceStreams;
-	private boolean needValidation;
-	private String policyId;
-	private Config config;
-	private final static String EXECUTION_PLAN_NAME = "query";
+    private volatile SiddhiRuntime siddhiRuntime;
+    private final String[] sourceStreams;
+    private final boolean needValidation;
+    private final Config config;
+    private final PolicyEvaluationContext<T, K> context;
 
-	/**
-	 * everything dependent on policyDef should be together and switched in runtime
-	 */
-	public static class SiddhiRuntime{
-		QueryCallback callback;
-		Map<String, InputHandler> siddhiInputHandlers;
-		SiddhiManager siddhiManager;
-		SiddhiPolicyDefinition policyDef;
-		List<String> outputFields;
-		String executionPlanName;
-	}
+    /**
+     * everything dependent on policyDef should be together and switched in runtime
+     */
+    public static class SiddhiRuntime {
+        QueryCallback queryCallback;
+        Map<String, InputHandler> siddhiInputHandlers;
+        SiddhiManager siddhiManager;
+        SiddhiPolicyDefinition policyDef;
+        List<String> outputFields;
+        String executionPlanName;
+    }
 
-	public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
-		this(config, policyName, policyDef, sourceStreams, false);
-	}
+    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams) {
+        this(config, context, policyDef, sourceStreams, false);
+    }
 
-	public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
-		this.config = config;
-		this.policyId = policyId;
-		this.needValidation = needValidation;
-		this.sourceStreams = sourceStreams;
-		init(policyDef);
-	}
+    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation) {
+        this.config = config;
+        this.context = context;
+        this.context.evaluator = this;
+        this.needValidation = needValidation;
+        this.sourceStreams = sourceStreams;
+        init(policyDef);
+    }
 
-	public void init(AbstractPolicyDefinition policyDef){
-		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
-	}
+    public void init(AbstractPolicyDefinition policyDef) {
+        siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef);
+    }
 
-	public static String addContextFieldIfNotExist(String expression) {
-		// select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
-		int pos = expression.indexOf("select ") + 7;
-		int index = pos;
-		boolean isSelectStarPattern = true;
-		while(index < expression.length()) {
-			if (expression.charAt(index) == ' ') index++;
-			else if (expression.charAt(index) == '*') break;
-			else {
-				isSelectStarPattern = false;
-				break;
-			}
-		}
-		if (isSelectStarPattern) return expression;
-		StringBuilder sb = new StringBuilder();
-		sb.append(expression.substring(0, pos));
-		sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
-		sb.append(expression.substring(pos, expression.length()));
-		return sb.toString();
-	}
-
-	private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){
-		SiddhiManager siddhiManager = new SiddhiManager();
-		Map<String, InputHandler> siddhiInputHandlers = new HashMap<>();
+    public static String addContextFieldIfNotExist(String expression) {
+        // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
+        int pos = expression.indexOf("select ") + 7;
+        int index = pos;
+        boolean isSelectStarPattern = true;
+        while (index < expression.length()) {
+            if (expression.charAt(index) == ' ') index++;
+            else if (expression.charAt(index) == '*') break;
+            else {
+                isSelectStarPattern = false;
+                break;
+            }
+        }
+        if (isSelectStarPattern) return expression;
+        StringBuilder sb = new StringBuilder();
+        sb.append(expression.substring(0, pos));
+        sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
+        sb.append(expression.substring(pos, expression.length()));
+        return sb.toString();
+    }
 
-		// compose execution plan sql
-		String executionPlan = policyDef.getExpression();
-		if (!policyDef.isContainsDefinition()) {
-			StringBuilder sb = new StringBuilder();
-			for (String sourceStream : sourceStreams) {
-				String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
-				LOG.info("Siddhi stream definition : " + streamDef);
-				sb.append(streamDef);
-			}
+    private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef) {
+        SiddhiManager siddhiManager = new SiddhiManager();
+        Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
+        SiddhiRuntime runtime = new SiddhiRuntime();
 
-			String expression = addContextFieldIfNotExist(policyDef.getExpression());
-			executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
-		}
+        // compose execution plan sql
+        String executionPlan = policyDef.getExpression();
+        if (!policyDef.isContainsDefinition()) {
+            StringBuilder sb = new StringBuilder();
+            for (String sourceStream : sourceStreams) {
+                String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
+                LOG.info("Siddhi stream definition : " + streamDef);
+                sb.append(streamDef);
+            }
 
-		ExecutionPlanRuntime executionPlanRuntime = null;
-		try {
-				executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-		}catch (SiddhiParserException ex){
-			LOG.error("Failed to parse: "+executionPlan,ex);
-			throw ex;
-		}
+            String expression = policyDef.getExpression();
+            executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
+        }
 
-		for(String sourceStream : sourceStreams){
-			siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
-		}
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
 
-		executionPlanRuntime.start();
+        for (String sourceStream : sourceStreams) {
+            siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
+        }
+        executionPlanRuntime.start();
 
-		QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, this);
+        LOG.info("Siddhi query: " + executionPlan);
 
-		LOG.info("Siddhi query: " + executionPlan);
-		executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
+        attachCallback(runtime, executionPlanRuntime, context);
 
-		List<String> outputFields = new ArrayList<String>();
-		try {
-	        Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
-	        field.setAccessible(true);
-	        Query query = (Query)field.get(callback);
-	        List<OutputAttribute> list = query.getSelector().getSelectionList();
-	        for (OutputAttribute output : list) {
-	        	outputFields.add(output.getRename());
-	        }
-		}
-		catch (Exception ex) {
-			LOG.error("Got an Exception when initial outputFields ", ex);
-		}
-		SiddhiRuntime runtime = new SiddhiRuntime();
-		runtime.siddhiInputHandlers = siddhiInputHandlers;
-		runtime.siddhiManager = siddhiManager;
-		runtime.callback = callback;
-		runtime.policyDef = policyDef;
-		runtime.outputFields = outputFields;
-		runtime.executionPlanName = executionPlanRuntime.getName();
-		return runtime;
-	}
+        runtime.siddhiInputHandlers = siddhiInputHandlers;
+        runtime.siddhiManager = siddhiManager;
+        runtime.policyDef = policyDef;
+        runtime.executionPlanName = executionPlanRuntime.getName();
+        return runtime;
+    }
 
-	/**
-	 * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
-	 * 2. runtime check for input data (This is very expensive, so we ignore for now)
-	 *     the size of input map should be equal to size of attributes which stream metadata defines
-	 *     the attribute names should be equal to attribute names which stream metadata defines
-	 *     the input field cannot be null
-	 */
-	@SuppressWarnings({ "rawtypes" })
-	@Override
-	public void evaluate(ValuesArray data) throws Exception {
-		if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
-        Object siddhiAlertContext = data.get(0);
-		String streamName = (String)data.get(1);
-		SortedMap map = (SortedMap)data.get(2);
-		validateEventInRuntime(streamName, map);
-		synchronized(siddhiRuntime){
-			//insert siddhiAlertContext into the first field
-			List<Object> input = new ArrayList<>();
-			input.add(siddhiAlertContext);
-			// input.add(streamName);
-			putAttrsIntoInputStream(input, streamName, map);
+    private void attachCallback(SiddhiRuntime runtime, ExecutionPlanRuntime executionPlanRuntime, PolicyEvaluationContext<T, K> context) {
+        List<String> outputFields = new ArrayList<>();
+//        String outputStreamName = config.getString("alertExecutorConfigs." + executorId + "." + "outputStream");
+//        if (StringUtils.isNotEmpty(outputStreamName)) {
+//            StreamCallback streamCallback = new SiddhiOutputStreamCallback<>(config, this);
+//            executionPlanRuntime.addCallback(outputStreamName, streamCallback);
+//            runtime.outputStreamCallback = streamCallback;
+//            // find output attribute from stream call back
+//            try {
+//                Field field = StreamCallback.class.getDeclaredField("streamDefinition");
+//                field.setAccessible(true);
+//                AbstractDefinition outStreamDef = (AbstractDefinition) field.get(streamCallback);
+//                outputFields = Arrays.asList(outStreamDef.getAttributeNameArray());
+//            } catch (Exception ex) {
+//                LOG.error("Got an Exception when initial outputFields ", ex);
+//            }
+//        } else {
+            QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, context);
+            executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
+            runtime.queryCallback = callback;
+            // find output attribute from query call back
             try {
-                InputHandler inputHandler = siddhiRuntime.siddhiInputHandlers.get(streamName);
-				if(inputHandler == null) throw new NullPointerException("InputHandler for stream ["+streamName+"] is not found");
-				inputHandler.send(input.toArray(new Object[input.size()]));
-            }catch (InterruptedException ex){
-                LOG.error("Got exception "+ex.getMessage(),ex);
+                Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
+                field.setAccessible(true);
+                Query query = (Query) field.get(callback);
+                List<OutputAttribute> list = query.getSelector().getSelectionList();
+                for (OutputAttribute output : list) {
+                    outputFields.add(output.getRename());
+                }
+            } catch (Exception ex) {
+                LOG.error("Got an Exception when initial outputFields ", ex);
             }
-		}
-	}
+//        }
+        runtime.outputFields = outputFields;
+    }
 
-	/**
-	 * This is a heavy operation, we should avoid to use.
-     *
+    /**
+     * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
+     * 2. runtime check for input data (This is very expensive, so we ignore for now)
+     * the size of input map should be equal to size of attributes which stream metadata defines
+     * the attribute names should be equal to attribute names which stream metadata defines
+     * the input field cannot be null
+     */
+    @SuppressWarnings({"rawtypes"})
+    @Override
+    public void evaluate(ValuesArray data) throws Exception {
+        if (LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
+        Collector outputCollector = (Collector) data.get(0);
+        String streamName = (String) data.get(1);
+        SortedMap map = (SortedMap) data.get(2);
+        validateEventInRuntime(streamName, map);
+        synchronized (siddhiRuntime) {
+            // retain the collector in the context. This assignment is idempotent
+            context.outputCollector = outputCollector;
+
+            List<Object> input = new ArrayList<>();
+            putAttrsIntoInputStream(input, streamName, map);
+            siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+        }
+    }
+
+    /**
+     * This is a heavy operation, we should avoid to use.
+     * <p/>
      * This validation method will skip invalid fields in event which are not declared in stream schema otherwise it will cause exception for siddhi engine.
      *
+     * @param sourceStream source steam id
+     * @param data         input event
      * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a>
-     *
-	 * @param sourceStream source steam id
-	 * @param data input event
-	 */
-	private void validateEventInRuntime(String sourceStream, SortedMap data){
-		if(!needValidation)
-			return;
-		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
-        if(!map.keySet().equals(data.keySet())){
+     */
+    private void validateEventInRuntime(String sourceStream, SortedMap data) {
+        if (!needValidation)
+            return;
+        SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
+        if (!map.keySet().equals(data.keySet())) {
             Set<Object> badKeys = new TreeSet<>();
-            for(Object key:data.keySet()) if(!map.containsKey(key)) badKeys.add(key);
-            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(),data.toString(), sourceStream,map.keySet().toString()));
-            for(Object key:badKeys) data.remove(key);
+            for (Object key : data.keySet()) if (!map.containsKey(key)) badKeys.add(key);
+            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(), data.toString(), sourceStream, map.keySet().toString()));
+            for (Object key : badKeys) data.remove(key);
         }
-	}
+    }
 
-	private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
-		if(!needValidation) {
-			input.addAll(map.values());
-			return;
-		}
-		for (Object key : map.keySet()) {
-			Object value = map.get(key);
-			if (value == null) {
-				input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key));
-			}
-			else input.add(value);
-		}
-	}
+    private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
+        if (!needValidation) {
+            input.addAll(map.values());
+            return;
+        }
+        for (Object key : map.keySet()) {
+            Object value = map.get(key);
+            if (value == null) {
+                input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String) key));
+            } else input.add(value);
+        }
+    }
 
-	@Override
-	public void onPolicyUpdate(T newAlertDef) {
-		AbstractPolicyDefinition policyDef = null;
-		try {
-			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
-					AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
-		}
-		catch (Exception ex) {
-			LOG.error("Initial policy def error, ", ex);
-		}
-		SiddhiRuntime previous = siddhiRuntime;
-		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
-		synchronized(previous){
-			previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
-		}
-	}
+    @Override
+    public void onPolicyUpdate(T newAlertDef) {
+        AbstractPolicyDefinition policyDef = null;
+        try {
+            policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
+                    AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
+        } catch (Exception ex) {
+            LOG.error("Initial policy def error, ", ex);
+        }
+        SiddhiRuntime previous = siddhiRuntime;
+        siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef);
+        synchronized (previous) {
+            previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
+        }
+    }
 
-	@Override
-	public void onPolicyDelete(){
-		synchronized(siddhiRuntime){
-			LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
-			siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
-			LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
-		}
-	}
+    @Override
+    public void onPolicyDelete() {
+        synchronized (siddhiRuntime) {
+            LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
+            siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
+            LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
+        }
+    }
 
-	@Override
-	public String toString(){
-		return siddhiRuntime.policyDef.toString();
-	}
+    @Override
+    public String toString() {
+        return siddhiRuntime.policyDef.toString();
+    }
 
-	public String[] getStreamNames() {
-		return sourceStreams;
-	}
+    public String[] getStreamNames() {
+        return sourceStreams;
+    }
 
-	public Map<String, String> getAdditionalContext() {
-		Map<String, String> context = new HashMap<String, String>();
-		StringBuilder sourceStreams = new StringBuilder();
-		for (String streamName : getStreamNames()) {
-			sourceStreams.append(streamName + ",");
-		}
-		if (sourceStreams.length() > 0) {
-			sourceStreams.deleteCharAt(sourceStreams.length() - 1);
-		}
-		context.put(Constants.SOURCE_STREAMS, sourceStreams.toString());
-		context.put(Constants.POLICY_ID, policyId);
-		return context;
-	}
+    public Map<String, String> getAdditionalContext() {
+        Map<String, String> context = new HashMap<String, String>();
+        StringBuilder sourceStreams = new StringBuilder();
+        for (String streamName : getStreamNames()) {
+            sourceStreams.append(streamName + ",");
+        }
+        if (sourceStreams.length() > 0) {
+            sourceStreams.deleteCharAt(sourceStreams.length() - 1);
+        }
+        context.put(Constants.SOURCE_STREAMS, sourceStreams.toString());
+        context.put(Constants.POLICY_ID, this.context.policyId);
+        return context;
+    }
 
-	public List<String> getOutputStreamAttrNameList() {
-		return siddhiRuntime.outputFields;
-	}
+    public List<String> getOutputStreamAttrNameList() {
+        return siddhiRuntime.outputFields;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
index 9060920..43422f8 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
@@ -16,10 +16,9 @@
  */
 package org.apache.eagle.policy.siddhi;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
-import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.event.Event;
@@ -37,14 +36,14 @@ import java.util.List;
  */
 public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K> extends QueryCallback{
 
-	private SiddhiPolicyEvaluator<T, K> evaluator;
-	public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
-	public static final ObjectMapper mapper = new ObjectMapper();	
-	public Config config;
-	
-	public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator<T, K> evaluator) {
-		this.config = config;		
-		this.evaluator = evaluator;
+	private static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
+
+	private final Config config;
+	private final PolicyEvaluationContext<T, K> siddhiEvaluateContext;
+
+	public SiddhiQueryCallbackImpl(Config config, PolicyEvaluationContext<T, K> siddhiContext) {
+		this.config = config;
+		this.siddhiEvaluateContext = siddhiContext;
 	}
 	
 	public static List<String> convertToString(List<Object> data) {
@@ -72,14 +71,14 @@ public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K
 	}
 
 	public static List<Object> getOutputObject(Object[] data) {
-		List<Object> rets = new ArrayList<Object>();
-		boolean isFirst = true;
+		List<Object> rets = new ArrayList<>(data.length);
+//		boolean isFirst = true;
 		for (Object object : data) {
-			// The first field is siddhiAlertContext, skip it
-			if (isFirst) {
-				isFirst = false;
-				continue;
-			}
+//			// The first field is siddhiAlertContext, skip it
+//			if (isFirst) {
+//				isFirst = false;
+//				continue;
+//			}
 			rets.add(object);
 		}
 		return rets;
@@ -88,11 +87,9 @@ public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K
 	@SuppressWarnings("unchecked")
 	@Override
 	public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-		Object[] data = inEvents[0].getData();
-		PolicyEvaluationContext<T, K> siddhiAlertContext = (PolicyEvaluationContext<T, K>)data[0];
 		List<Object> rets = getOutputObject(inEvents[0].getData());
-		K alert = siddhiAlertContext.resultRender.render(config, rets, siddhiAlertContext, timeStamp);
-		SiddhiEvaluationHandler<T, K> handler = siddhiAlertContext.alertExecutor;
-		handler.onEvalEvents(siddhiAlertContext, Arrays.asList(alert));
+		K alert = siddhiEvaluateContext.resultRender.render(config, rets, siddhiEvaluateContext, timeStamp);
+		SiddhiEvaluationHandler<T, K> handler = siddhiEvaluateContext.alertExecutor;
+		handler.onEvalEvents(siddhiEvaluateContext, Arrays.asList(alert));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
index 33c7bc9..e4c3481 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
@@ -49,7 +49,7 @@ public class SiddhiStreamMetadataUtils {
 	public static String convertToStreamDef(String streamName){
 		SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
 		StringBuilder sb = new StringBuilder();
-		sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, ");
+//		sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, ");
 		for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
             appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
index 43b3998..c0ba13c 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.storage.jdbc;
 
+import junit.framework.Assert;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.log.entity.meta.EntityDefinition;
 import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
 import org.apache.eagle.log.entity.test.TestTimeSeriesAPIEntity;
@@ -26,9 +29,6 @@ import org.apache.eagle.storage.operation.CompiledQuery;
 import org.apache.eagle.storage.operation.RawQuery;
 import org.apache.eagle.storage.result.ModifyResult;
 import org.apache.eagle.storage.result.QueryResult;
-import org.apache.eagle.common.DateTimeUtil;
-import junit.framework.Assert;
-import org.apache.commons.lang.time.StopWatch;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -65,7 +65,7 @@ public class TestJdbcStorage {
         Assert.assertNotNull(result);
     }
 
-    @Test
+//    @Test
     public void testReadByComplexQuery() throws QueryCompileException, IOException {
         RawQuery rawQuery = new RawQuery();
         rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"cluster\" AND @field4 > 1000 AND @field7 CONTAINS \"subtext\" OR @jobID =\"jobID\" ]{@field1,@field2}");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
new file mode 100644
index 0000000..63eb88b
--- /dev/null
+++ b/eagle-hadoop-metric/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-parent</artifactId>
+        <groupId>eagle</groupId>
+        <version>0.3.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-hadoop-metric</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
new file mode 100644
index 0000000..27a416d
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+
+/**
+ * Created on 1/19/16.
+ */
+public class HadoopJmxMetricDeserializer implements SpoutKafkaMessageDeserializer {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopJmxMetricDeserializer.class);
+
+    private Properties props;
+
+    public  HadoopJmxMetricDeserializer(Properties props){
+        this.props = props;
+    }
+
+
+    // convert to a map of <key, map<>>
+    @Override
+    public Object deserialize(byte[] arg0) {
+        try {
+            String content = new String(arg0, Charset.defaultCharset().name());
+            Map<String, Object> metricItem = JsonSerDeserUtils.deserialize(content, SortedMap.class);
+            return metricItem;
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error("unrecognizable content", e);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
new file mode 100644
index 0000000..7c574f8
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.core.StreamProducer;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 1/12/16.
+ */
+public class NameNodeLagMonitor {
+
+    public static void main(String[] args) {
+        StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
+        String streamName = "hadoopJmxMetricEventStream";
+        StreamProducer sp = env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).parallelism(1).nameAs(streamName);
+        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
+
+        env.execute();
+    }
+
+    // create a tuple kafka source
+    private static KafkaSourcedSpoutProvider createProvider(Config config) {
+        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+            @Override
+            public List<Object> deserialize(byte[] ser) {
+                Object tmp = deserializer.deserialize(ser);
+                Map<String, Object> map = (Map<String, Object>)tmp;
+                if(tmp == null) return null;
+                // this is the key to be grouped by
+                return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp);
+            }
+        };
+
+        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+            @Override
+            public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+                return new SchemeAsMultiScheme(scheme);
+            }
+        };
+        return provider;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/13bb16cf/eagle-hadoop-metric/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf
new file mode 100644
index 0000000..aa7e340
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/application.conf
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "nameNodeLagTopology",
+    "stormConfigFile" : "namenodelage.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1,
+      "hadoopJmxMetricAlertExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "nn_jmx_metric_sandbox",
+    "zkConnection" : "localhost:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "EagleConsumer",
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.hadoop.metric.HadoopJmxMetricDeserializer",
+    "transactionZKServers" : "localhost",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+     "hadoopJmxMetricAlertExecutor" : {
+       "parallelism" : 1,
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+       "needValidation" : "true"
+     }
+  },
+  "persistExecutorConfigs" {
+    "persistExecutor1" : {
+      "kafka": {
+        "bootstrap_servers" : "localhost",
+        "topics" : {
+          "defaultOutput" : "downSampleOutput"
+        }
+      }
+    }
+  },
+  "aggregateExecutorConfigs" : {
+    "aggregateExecutor1" : {
+      "parallelism" : 1,
+      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+      "needValidation" : "true"
+    }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "dataSource": "hadoopJmxMetricDataSource",
+  	"dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailHost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "balancePartitionEnabled" : true,
+    #"partitionRefreshIntervalInMin" : 60,
+    #"kafkaStatisticRangeInMin" : 60,
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+    "readHdfsUserCommandPatternFrom" : "file"
+  },
+  "dynamicConfigSource" : {
+  	"enabled" : true,
+  	"initDelayMillis" : 0,
+  	"delayMillis" : 30000
+  }
+}


Mime
View raw message