eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [5/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and persistence DSL support
Date Tue, 12 Jan 2016 07:47:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
new file mode 100644
index 0000000..90353be
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.executor;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
+import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import junit.framework.Assert;
+
+/**
+ * @since Dec 18, 2015
+ *
+ */
+public class TestPolicyExecutor {
+
+	public static class T2 extends AbstractPolicyDefinitionEntity {
+		@Override
+		public String getPolicyDef() {
+			return null;
+		}
+		@Override
+		public boolean isEnabled() {
+			return false;
+		}
+	}
+
+	// not feasible to Unit test, it requires the local service.
+	@Ignore
+	@Test
+	public void testReflectCreatePolicyEvaluator() throws Exception {
+		System.setProperty("config.resource", "/unittest.conf");
+		String policyType = Constants.policyType.siddhiCEPEngine.name();
+		Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
+		Config config = ConfigFactory.load();
+
+		String def = "{\"expression\":\"from hdfsAuditLogEventStream select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}";
+		// test1 : test json deserialization
+		AbstractPolicyDefinition policyDef = null;
+		policyDef = JsonSerDeserUtils.deserialize(def, AbstractPolicyDefinition.class,
+				PolicyManager.getInstance().getPolicyModules(policyType));
+		// Assert conversion succeed
+		Assert.assertEquals(SiddhiPolicyDefinition.class, policyDef.getClass());
+
+		// make sure meta data manager initialized
+		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
+
+		String[] sourceStreams = new String[] { "hdfsAuditLogEventStream" };
+		// test2 : test evaluator
+		PolicyEvaluator pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class,
+				String[].class, boolean.class).newInstance(config, "policy-id", policyDef, sourceStreams, false);
+
+		PolicyEvaluator<AlertDefinitionAPIEntity> e1 = (PolicyEvaluator<AlertDefinitionAPIEntity>) pe;
+
+		PolicyEvaluator<T2> e2 = (PolicyEvaluator<T2>) pe;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
index 091a6a6..188ad97 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
@@ -20,15 +20,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.policy.DynamicPolicyLoader;
+import org.apache.eagle.policy.PolicyLifecycleMethods;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import junit.framework.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import junit.framework.Assert;
+
 public class TestDynamicPolicyLoader {
 	private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
 
@@ -36,8 +40,8 @@ public class TestDynamicPolicyLoader {
 	public void test() throws Exception{
 		System.setProperty("config.resource", "/unittest.conf");
 		Config config = ConfigFactory.load();
-		Map<String, PolicyLifecycleMethods> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods>();
-		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods() {
+		Map<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>>();
+		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods<AlertDefinitionAPIEntity>() {
 			@Override
 			public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
 				LOG.info("deleted : " + deleted);
@@ -62,9 +66,9 @@ public class TestDynamicPolicyLoader {
 		map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
 		map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
 		
-		AlertDefinitionDAO dao = new AlertDefinitionDAO() {
+		PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao = new PolicyDefinitionDAO<AlertDefinitionAPIEntity>() {
 			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(
+			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(
 					String site, String dataSource) {
 				Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
 				currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
@@ -75,12 +79,12 @@ public class TestDynamicPolicyLoader {
 			}
 			
 			@Override
-			public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) {
+			public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) {
 				return null;
 			}
 		};
 		
-		DynamicPolicyLoader loader = DynamicPolicyLoader.getInstance();
+		DynamicPolicyLoader<AlertDefinitionAPIEntity> loader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
 		loader.init(initialAlertDefs, dao, config);
 		
 		try{
@@ -102,4 +106,5 @@ public class TestDynamicPolicyLoader {
 		entity.setPolicyDef(policyDef);
 		return entity;
 	}
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
index 367187d..77aaec3 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.policy;
 
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
 import org.junit.Test;
 
 public class TestPolicyDistribution {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
index 579b9f0..7c02db9 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
@@ -22,13 +22,14 @@ package org.apache.eagle.alert.policy;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.executor.AlertExecutor;
+import org.apache.eagle.alert.executor.AlertExecutor;
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -44,13 +45,13 @@ public class TestPolicyDistributionUpdater {
 
     @Test
     public void testPolicyDistributionReporter() throws Exception{
-        AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
+        PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, 1)) {
             @Override
-            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
+            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
                 final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
                 entity.setTags(new HashMap<String, String>() {{
-                    put(AlertConstants.POLICY_TYPE, "siddhiCEPEngine");
-                    put(AlertConstants.POLICY_ID, "policyId_1");
+                    put(Constants.POLICY_TYPE, "siddhiCEPEngine");
+                    put(Constants.POLICY_ID, "policyId_1");
                 }});
                 Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
                 map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
index b04d206..c3bc4c9 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.policy;
 
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
 import org.junit.Test;
 
 public class TestPolicyPartitioner {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
new file mode 100644
index 0000000..dcb38a4
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.text.MessageFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * @since Dec 23, 2015
+ *
+ */
+public class TestExternalBatchWindow {
+
+    private static SiddhiManager siddhiManager;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        siddhiManager = new SiddhiManager();
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        siddhiManager.shutdown();
+    }
+
+    @Test
+    public void test02NoMsg() throws Exception {
+        ExecutionPlanRuntime runtime = simpleQueryRuntime();
+
+        final AtomicBoolean recieved = new AtomicBoolean();
+        runtime.addCallback("query", new QueryCallback() {
+
+            @Override
+            public void receive(long arg0, Event[] arg1, Event[] arg2) {
+                recieved.set(true);
+                System.out.println(arg1);
+            }
+        });
+
+        InputHandler input = runtime.getInputHandler("jmxMetric");
+
+        runtime.start();
+        // external events' time stamp less than the window, should not have event recieved in call back.
+        long now = System.currentTimeMillis();
+        int length = 5;
+        for (int i = 0; i < length; i++) {
+            input.send(new Object[] { 15, now + i * 1000 });
+        }
+        
+        Thread.sleep(1000);
+        Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
+
+        runtime.shutdown();
+    }
+
+    private ExecutionPlanRuntime simpleQueryRuntime() {
+        String query = "define stream jmxMetric(cpu int, timestamp long); " 
+                + "@info(name='query')"
+                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) " 
+                + "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
+
+        return siddhiManager.createExecutionPlanRuntime(query);
+    }
+
+    /**
+     * This case try to capture the case that the window get a chunk of event that exceed the time batch.
+     * In this case, two next processor should be triggered.
+     */
+    @Test
+    public void test03BunchChunkExceedBatch() {
+        // TODO
+    }
+    @Test
+    public void test04MultiThread() {
+        // TODO
+    }
+
+    @Test
+    public void test05ExternalJoin() {
+        // TODO
+    }
+    
+    @Test
+    public void test06EdgeCase() throws Exception {
+        // every 10 sec
+        ExecutionPlanRuntime runtime = simpleQueryRuntime();
+
+        final AtomicInteger recCount = new AtomicInteger(0);
+//        final CountDownLatch latch = new CountDownLatch(2);// for debug
+        runtime.addCallback("query", new QueryCallback() {
+            @Override
+            public void receive(long arg0, Event[] arg1, Event[] arg2) {
+//                latch.countDown();
+                Assert.assertEquals(1, arg1.length);
+                recCount.incrementAndGet();
+                int avgCpu = (Integer)arg1[0].getData()[0];
+                if (recCount.get() == 1) {
+                    Assert.assertEquals(15, avgCpu);
+                } else if (recCount.get() == 2) {
+                    Assert.assertEquals(85, avgCpu);
+                }
+                int count = (Integer) arg1[0].getData()[1];
+                Assert.assertEquals(3, count);
+            }
+        });
+
+        InputHandler input = runtime.getInputHandler("jmxMetric");
+        runtime.start();
+        // external events' time stamp less than the window, should not have event recieved in call back.
+        long now = 0;
+        int length = 3;
+        for (int i = 0; i < length; i++) {
+            input.send(new Object[] { 15, now + i * 10 });
+        }
+        
+        // second round
+        // if the trigger event mix with the last window, we should see the avgValue is not expected
+        for (int i = 0; i < length; i++) {
+            input.send(new Object[] { 85, now + 10000 + i * 10 }); // the first entity of the second round
+        }
+        // to trigger second round
+        input.send(new Object[] { 10000, now + 10 * 10000 });
+        
+//        latch.await();// for debug
+
+        Thread.sleep(1000);
+        
+        Assert.assertEquals(2, recCount.get());
+    }
+
+    @Test
+    public void test07Pull76() throws Exception {
+        String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
+
+        String query = " @info(name='pull76') "
+                + " from LoginEvents#window.eagle:externalTimeBatch(myTime, 5 sec)  "
+                + " select myTime, phone, ip, price, count(ip) as cntip , "
+                + " min(myTime) as mintime, max(myTime) as maxtime "
+                + " insert into events ;";
+        
+        ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
+
+        InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
+
+        runtime.addCallback("pull76", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                if (inEvents != null) {
+                    System.out.println("======================== START ===============================");
+                    int i = 0;
+                    System.out.println(" Events Size:" + inEvents.length);
+                    for (i = 0; i < inEvents.length; i++) {
+                        Event e = inEvents[i];
+                        System.out.println("----------------------------");
+                        System.out.println(new Date((Long) e.getData(0)));
+                        System.out.println("IP:" + e.getData(2));
+                        System.out.println("price :" + e.getData(3));
+                        System.out.println("count :" + e.getData(4));
+                        System.out.println("mintime :" + new Date((Long) e.getData(5)) );
+                        System.out.println("maxtime :" + new Date((Long) e.getData(6)) );
+                        System.out.println("----------------------------");
+                    }
+                    System.out.println("======================== END  ===============================");
+
+                }
+            }
+        });
+        
+        
+        runtime.start();
+        
+        long start = System.currentTimeMillis();
+        Calendar c = Calendar.getInstance();
+        c.add(Calendar.HOUR, 1);
+        c.add(Calendar.SECOND, 1);
+        int i = 0;
+        for (i = 0; i <= 10000; i++) {
+            c.add(Calendar.SECOND, 1);
+            inputHandler.send(c.getTime().getTime(),
+                    new Object[] { c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000) });
+        }
+        long end = System.currentTimeMillis();
+        System.out.printf("End : %d ", end - start);
+
+        Thread.sleep(1000);
+        runtime.shutdown();
+    }
+    
+    @Test
+    public void test01DownSampling() throws Exception {
+        String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
+        String query = "@info(name = 'downSample') " 
+                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) "
+                + "select "
+                + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
+                + " '|' as s, "
+                + " avg(memory) as avgMem, max(memory) as maxMem, min(memory) as minMem, "
+                + " '|' as s1, "
+                + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
+                + " '|' as s2, "
+                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, " 
+                + " '|' as s3, "
+                + " timestamp as timeWindowEnds, "
+                + " '|' as s4, "
+                + " count(1) as metric_count "
+                + " INSERT INTO tmp;";
+
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
+
+        InputHandler input = plan.getInputHandler("jmxMetric");
+
+        // stream call back doesn't follow the counter
+        final AtomicInteger counter = new AtomicInteger();
+        {
+            // stream callback
+            plan.addCallback("jmxMetric", new StreamCallback() {
+                @Override
+                public void receive(Event[] arg0) {
+                    counter.addAndGet(arg0.length);
+                }
+            });
+        }
+        final AtomicInteger queryWideCounter = new AtomicInteger();
+        {
+            plan.addCallback("downSample", new QueryCallback() {
+                @Override
+                public void receive(long arg0, Event[] inevents, Event[] removeevents) {
+                    int currentCount = queryWideCounter.addAndGet(inevents.length);
+                    System.out.println(MessageFormat.format("Round {0} ====", currentCount));
+                    System.out.println(" events count " + inevents.length);
+
+                    for (Event e : inevents) {
+                        Object[] tranformedData = e.getData();
+                        for (Object o : tranformedData) {
+                            System.out.print(o);
+                            System.out.print(' ');
+                        }
+                        System.out.println(" events endendend");
+                    }
+                }
+
+            });
+        }
+
+        plan.start();
+
+        int round = 4;
+        int eventsPerRound= 0;
+        long externalTs = System.currentTimeMillis();
+        for (int i = 0; i < round; i++) {
+            eventsPerRound = sendEvent(input, i, externalTs);
+            Thread.sleep(3000);
+        }
+        //
+        sendEvent(input, round, externalTs);
+
+        plan.shutdown();
+        Thread.sleep(1000);
+        Assert.assertEquals(round * eventsPerRound + eventsPerRound, counter.get());
+        Assert.assertEquals(round, queryWideCounter.get());
+    }
+
+    // one round of sending events
+    private int sendEvent(InputHandler input, int ite, long externalTs) throws Exception {
+        int len = 3;
+        Event[] events = new Event[len];
+        for (int i = 0; i < len; i++) {
+            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
+            events[i] = new Event(externalTs,
+                    new Object[] { 15 + 10 * i * ite, 1500 + 10 * i * ite, 1000L, 2000L, externalTs + ite * 10000 + i * 50 });
+        }
+
+        input.send(events);
+        return len;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
index 8dcc20b..acead47 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
@@ -18,7 +18,7 @@ package org.apache.eagle.alert.siddhi;
 
 import java.lang.reflect.Field;
 
-import org.apache.eagle.executor.AlertExecutor;
+import org.apache.eagle.alert.executor.AlertExecutor;
 import junit.framework.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
index beb790a..1967265 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.siddhi;
 
+import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,6 +29,6 @@ public class TestSiddhiStream {
 		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
 		
 		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";
-		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));		
+		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
index 576001b..a7e2ddb 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
@@ -30,6 +30,6 @@ trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
 upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
 hex=org.wso2.siddhi.extension.string.HexFunctionExtension
 unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file
+equalsIgnoreCase=org.apache.eagle.policy.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.policy.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.policy.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/unittest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/unittest.conf b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/unittest.conf
index 0148d45..35672d3 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/unittest.conf
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/unittest.conf
@@ -40,7 +40,7 @@
   "alertExecutorConfigs" : {
      "hdfsAuditLogAlertExecutor" : {
        "parallelism" : 2,
-       "partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
        "needValidation" : "true"
      }
   },

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java b/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
index 7c7d5bc..43bcfe2 100644
--- a/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
+++ b/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
@@ -27,9 +27,9 @@ import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.ExecutionPlanRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
 
-import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.siddhi.AttributeType;
+import org.apache.eagle.policy.siddhi.AttributeType;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.common.DateTimeUtil;
 import com.fasterxml.jackson.databind.Module;
@@ -50,7 +50,7 @@ public class SiddhiAlertPolicyValidateProvider extends AlertPolicyValidateProvid
 		String startTime = "1969-01-01 00:00:00";
 		String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(Long.MAX_VALUE);
 		int pageSize = 1000;
-		String query = AlertConstants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@streamName=\"" + streamName + "\"]{*}";
+		String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@streamName=\"" + streamName + "\"]{*}";
 		GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> streamResponse = resource.search(query, startTime, endTime, pageSize, null, false, false, 0L, 0, true, 0, null, false);
 		List<AlertStreamSchemaEntity> list = streamResponse.getObj();
 		

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
index 489fc34..38f55b8 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml
@@ -84,6 +84,12 @@
           <artifactId>scalatest_${scala.version}</artifactId>
           <scope>test</scope>
       </dependency>
+
+	<dependency>
+		<groupId>org.apache.kafka</groupId>
+		<artifactId>kafka-clients</artifactId>
+		<version>${kafka-clients.version}</version>
+	</dependency>
   </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
new file mode 100644
index 0000000..b17c192
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate;
+
+import org.apache.eagle.policy.ResultRender;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.eagle.policy.executor.PolicyProcessExecutor;
+
+/**
+ * @since Dec 16, 2015
+ *
+ */
+public class AggregateExecutor extends PolicyProcessExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
+
+	private static final long serialVersionUID = 1L;
+
+	private ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> render = new AggregateResultRender();
+
+	public AggregateExecutor(String executorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
+			PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
+		super(executorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
+				AggregateDefinitionAPIEntity.class);
+	}
+
+	@Override
+	public ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> getResultRender() {
+		return render;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
new file mode 100644
index 0000000..e66d3fb
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+import org.apache.eagle.policy.executor.IPolicyExecutor;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since Dec 16, 2015
+ *
+ */
+public class AggregateExecutorFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AggregateExecutorFactory.class);
+	
+	private AggregateExecutorFactory() {}
+	public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory();
+
+
+	public IPolicyExecutor[] createExecutors(String cql) throws Exception {
+		int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls);
+
+		IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions];
+		for (int i = 0; i < numPartitions ; i++ ) {
+			executors[i] = new SimpleAggregateExecutor(cql, "siddhiCEPEngine", i, numPartitions);
+		}
+
+		return executors;
+	}
+
+	public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception {
+		StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
+        int numPartitions = loadExecutorConfig(config, executorId, partitionerCls);
+        
+		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<AggregateDefinitionAPIEntity>(
+				new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
+		
+		
+		return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString());
+	}
+	
+	@SuppressWarnings("unchecked")
+	private int loadExecutorConfig(Config config, String executorId, StringBuilder partitionerCls) {
+		int numPartitions = 0;
+		String aggregateExecutorConfigsKey = "aggregateExecutorConfigs";
+        if(config.hasPath(aggregateExecutorConfigsKey)) {
+            Map<String, ConfigValue> analyzeExecutorConfigs = config.getObject(aggregateExecutorConfigsKey);
+            if(analyzeExecutorConfigs !=null && analyzeExecutorConfigs.containsKey(executorId)) {
+                Map<String, Object> alertExecutorConfig = (Map<String, Object>) analyzeExecutorConfigs.get(executorId).unwrapped();
+                int parts = 0;
+                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
+                numPartitions = parts == 0 ? 1 : parts;
+                if(alertExecutorConfig.containsKey("partitioner")) {
+                	partitionerCls.setLength(0);
+                	partitionerCls.append((String) alertExecutorConfig.get("partitioner"));
+                }
+            }
+        }
+        return numPartitions;
+	}
+
+
+//	private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception {
+//		// Get map from alertExecutorId to alert stream
+//		// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
+//		List<String> streamNames = new ArrayList<String>();
+//		// FIXME : here we reuse the executor definition. But the name alert is not ambiguous now.
+//		AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config));
+//		List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource,
+//				executorId);
+//		for (AlertExecutorEntity entity : alertExecutorEntities) {
+//			streamNames.add(entity.getTags().get(Constants.STREAM_NAME));
+//		}
+//		return streamNames;
+//	}
+	
+	private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO,
+			List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls)
+					throws Exception {
+		LOG.info("Creating alert executors with executorID: " + executorID + ", numPartitions: "
+				+ numPartitions + ", Partition class is: " + partitionerCls);
+
+		PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance();
+		AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions];
+		String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+
+		for (int i = 0; i < numPartitions; i++) {
+			alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO,
+					_sourceStreams);
+		}
+		return alertExecutors;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
new file mode 100644
index 0000000..986885a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.policy.ResultRender;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Created on 12/29/15.
+ */
+public class AggregateResultRender implements ResultRender<AggregateDefinitionAPIEntity, AggregateEntity>, Serializable {
+
+
+    @Override
+    public AggregateEntity render(Config config,
+                                  List<Object> rets,
+                                  PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> siddhiAlertContext,
+                                  long timestamp) {
+        AggregateEntity result = new AggregateEntity();
+        for (Object o : rets) {
+            result.add(o);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
new file mode 100644
index 0000000..8c01935
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+import org.apache.eagle.policy.executor.IPolicyExecutor;
+import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Created on 1/10/16.
+ */
+public class SimpleAggregateExecutor
+        extends JavaStormStreamExecutor2<String, AggregateEntity>
+        implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleAggregateExecutor.class);
+
+    private final String cql;
+    private final int partitionSeq;
+    private final int totalPartitionNum;
+
+    private String policyId;
+    private String executorId;
+    private Config config;
+    private AggregateDefinitionAPIEntity aggDef;
+    private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
+
+    public SimpleAggregateExecutor(String cql, String policyType, int partitionSeq, int totalPartitionNum) {
+        this.cql = cql;
+        this.partitionSeq = partitionSeq;
+        this.totalPartitionNum = totalPartitionNum;
+        // create an fixed definition policy api entity, and indicate it has full definition
+        aggDef = new AggregateDefinitionAPIEntity();
+        aggDef.setTags(new HashMap<String, String>());
+        aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
+        // TODO make it more general, not only hard code siddhi cep support here.
+        try {
+            String template = "{\"type\":\"siddhiCEPEngine\", \"expression\":\"%s\", \"containsDefintion\": true }";
+            aggDef.setPolicyDef(String.format(template, this.cql));
+        } catch (Exception e) {
+            LOG.error("simple aggregate generate policy definition failed!", e);
+        }
+        aggDef.setCreatedTime(new Date().getTime());
+        aggDef.setLastModifiedDate(new Date().getTime());
+        aggDef.setName("anonymous-aggregation-def");
+        aggDef.setOwner("anonymous");
+        aggDef.setEnabled(true);
+        aggDef.setDescription("anonymous aggregation definition");
+
+        String random = MD5Hash.getMD5AsHex(cql.getBytes());
+        policyId = "anonymousAggregatePolicyId-" + random;
+        executorId= "anonymousAggregateId-" +random;
+    }
+
+    @Override
+    public void prepareConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void init() {
+        evaluator = createPolicyEvaluator(aggDef);
+    }
+
+    /**
+     * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
+     *
+     * @return PolicyEvaluator instance
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected PolicyEvaluator<AggregateDefinitionAPIEntity> createPolicyEvaluator(AggregateDefinitionAPIEntity alertDef) {
+        String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
+        Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
+        if (evalCls == null) {
+            String msg = "No policy evaluator defined for policy type : " + policyType;
+            LOG.error(msg);
+            throw new IllegalStateException(msg);
+        }
+
+        AbstractPolicyDefinition policyDef = null;
+        try {
+            policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
+                    PolicyManager.getInstance().getPolicyModules(policyType));
+        } catch (Exception ex) {
+            LOG.error("Fail initial alert policy def: " + alertDef.getPolicyDef(), ex);
+        }
+
+        PolicyEvaluator<AggregateDefinitionAPIEntity> pe;
+        try {
+            // Create evaluator instances
+            pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
+                    .getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
+                    .newInstance(config, alertDef.getTags().get(Constants.POLICY_ID), policyDef, new String[]{Constants.EAGLE_DEFAULT_POLICY_NAME}, false);
+        } catch (Exception ex) {
+            LOG.error("Fail creating new policyEvaluator", ex);
+            LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
+            throw new IllegalStateException(ex);
+        }
+        return pe;
+    }
+
+    @Override
+    public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
+        if (input.size() != 3)
+            throw new IllegalStateException("AggregateExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
+        if (LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
+        if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + evaluator);
+
+        try {
+            PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> evaluationContext = new PolicyEvaluationContext<>();
+            evaluationContext.alertExecutor = this;
+            evaluationContext.policyId = policyId;
+            evaluationContext.evaluator = evaluator;
+            evaluationContext.outputCollector = collector;
+            evaluationContext.resultRender = new AggregateResultRender();
+            evaluator.evaluate(new ValuesArray(evaluationContext, input.get(1), input.get(2)));
+        } catch (Exception ex) {
+            LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
+        }
+    }
+
+    @Override
+    public void onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context, List<AggregateEntity> alerts) {
+        if (alerts != null && !alerts.isEmpty()) {
+            String policyId = context.policyId;
+            LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId));
+            Collector outputCollector = context.outputCollector;
+            PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator = context.evaluator;
+            for (AggregateEntity entity : alerts) {
+                synchronized (this) {
+                    outputCollector.collect(new Tuple2(policyId, entity));
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    @Override
+    public int getPartitionSeq() {
+        return partitionSeq;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
new file mode 100644
index 0000000..62830ae
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate.entity;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * entity of stream analyze definition
+ *
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("aggregatedef")
+@ColumnFamily("f")
+@Prefix("aggregatedef")
+@Service(Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "dataSource", "executorId", "policyId", "policyType"})
+@Indexes({
+	@Index(name="Index_1_aggregateExecutorId", columns = { "executorId" }, unique = true),
+})
+@SuppressWarnings("serial")
+public class AggregateDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
+
+	@Column("a")
+	private String name;
+	@Column("b")
+	private String policyDef;
+	@Column("c")
+	private String description;
+	@Column("d")
+	private boolean enabled;
+	@Column("e")
+	private String owner;
+	@Column("f")
+	private long lastModifiedDate;
+	@Column("g")
+	private long createdTime;
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public String getPolicyDef() {
+		return policyDef;
+	}
+
+	public void setPolicyDef(String policyDef) {
+		this.policyDef = policyDef;
+		valueChanged("policyDef");
+	}
+
+	public String getDescription() {
+		return description;
+	}
+
+	public void setDescription(String description) {
+		this.description = description;
+		valueChanged("description");
+	}
+
+	public boolean isEnabled() {
+		return enabled;
+	}
+
+	public void setEnabled(boolean enabled) {
+		this.enabled = enabled;
+		valueChanged("enabled");
+	}
+
+	public String getOwner() {
+		return owner;
+	}
+
+	public void setOwner(String owner) {
+		this.owner = owner;
+		valueChanged("owner");
+	}
+
+	public long getLastModifiedDate() {
+		return lastModifiedDate;
+	}
+
+	public void setLastModifiedDate(long lastModifiedDate) {
+		this.lastModifiedDate = lastModifiedDate;
+		valueChanged("lastModifiedDate");
+	}
+
+	public long getCreatedTime() {
+		return createdTime;
+	}
+
+	public void setCreatedTime(long createdTime) {
+		this.createdTime = createdTime;
+		valueChanged("createdTime");
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
new file mode 100644
index 0000000..64c20b2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate.entity;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Event entity during stream processing
+ * 
+ * @since Dec 17, 2015
+ *
+ */
+public class AggregateEntity implements Serializable {
+
+	private static final long serialVersionUID = 5911351515190098292L;
+
+    private List<Object> data = new LinkedList<>();
+
+    public void add(Object res) {
+        data.add(res);
+    }
+
+    public List<Object> getData() {
+        return data;
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
new file mode 100644
index 0000000..7c932d4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.aggregate.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+/**
+ * Created on 1/6/16.
+ */
+public class AggregateEntityRepository extends EntityRepository {
+    public AggregateEntityRepository() {
+        entitySet.add(AggregateDefinitionAPIEntity.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
new file mode 100644
index 0000000..0732639
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.persist;
+
+/**
+ * Interface by the stream framework to storage
+ * 
+ * @since Dec 19, 2015
+ *
+ */
+public interface IPersistService<T> {
+
+	boolean save(String stream, T apiEntity) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
new file mode 100644
index 0000000..ac0325d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.persist;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.datastream.core.StorageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.List;
+
+/**
+ *
+ * TODO: currently only accept to be used after aggregation node (See the AggregateEntity reference here).
+ * @since Dec 19, 2015
+ *
+ */
+public class PersistExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(PersistExecutor.class);
+
+	private Config config;
+	private IPersistService<AggregateEntity> persistService;
+	private String persistExecutorId;
+	private String persistType;
+
+	public PersistExecutor(String persistExecutorId, String persistType) {
+		this.persistExecutorId = persistExecutorId;
+		this.persistType = persistType;
+	}
+
+    @Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+
+    @Override
+	public void init() {
+		if (persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) {
+			Config subConfig = this.config.getConfig("persistExecutorConfigs" + "." + persistExecutorId);
+			persistService = new KafkaPersistService(subConfig);
+		} else {
+			throw new RuntimeException(String.format("Persist type '%s' not supported yet!", persistService));
+		}
+	}
+
+	@Override
+	public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
+		if (input.size() != 2) {
+			LOG.error(String.format("Persist executor expect two elements per tuple. But actually got size %d lists!",
+					input.size()));
+			return;
+		}
+
+		String policyId = (String) input.get(0);
+		AggregateEntity entity = (AggregateEntity) input.get(1);
+		try {
+			persistService.save("defaultOutput", entity);
+		} catch (Exception e) {
+			LOG.error(MessageFormat.format("persist entity failed: {0}", entity), e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
new file mode 100644
index 0000000..ea61278
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.persist.druid;
+
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * TODO: configurable null handling for serialization??
+ * Created on 1/4/16.
+ */
+public class AggregateEntitySerializer implements
+        Closeable, AutoCloseable, Serializer<AggregateEntity> {
+
+    private final StringSerializer stringSerializer = new StringSerializer();
+    private static final Logger logger = LoggerFactory.getLogger(AggregateEntitySerializer.class);
+    private static final ObjectMapper om = new ObjectMapper();
+
+    static {
+        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+
+    }
+
+    @Override
+    public byte[] serialize(String topic, AggregateEntity data) {
+        String str = null;
+        try {
+            str = om.writeValueAsString(data.getData());
+        } catch (IOException e) {
+            logger.error("Kafka serialization for send error!", e);
+        }
+        return stringSerializer.serialize(topic, str);
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
new file mode 100644
index 0000000..919b92e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.persist.druid;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
+import org.apache.eagle.dataproc.impl.persist.IPersistService;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.*;
+import java.util.concurrent.Future;
+
+/**
+ * TODO : support more general entity input
+ * @since Dec 21, 2015
+ *
+ */
+public class KafkaPersistService implements IPersistService<AggregateEntity> {
+
+	private static final String ACKS = "acks";
+	private static final String RETRIES = "retries";
+	private static final String BATCH_SIZE = "batchSize";
+	private static final String LINGER_MS = "lingerMs";
+	private static final String BUFFER_MEMORY = "bufferMemory";
+	private static final String KEY_SERIALIZER = "keySerializer";
+	private static final String VALUE_SERIALIZER = "valueSerializer";
+	private static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
+	
+	private KafkaProducer<String, AggregateEntity> producer;
+	private final Config config;
+	private final SortedMap<String, String> streamTopicMap;
+	private final Properties props;
+	
+	/**
+	 * <pre>
+	 * props.put("bootstrap.servers", "localhost:4242");
+	 * props.put("acks", "all");
+	 * props.put("retries", 0);
+	 * props.put("batch.size", 16384);
+	 * props.put("linger.ms", 1);
+	 * props.put("buffer.memory", 33554432);
+	 * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+	 * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+	 * </pre>
+	 */
+	public KafkaPersistService(Config config) {
+		this.config = config;
+		Config kafkaConfig = config.getConfig("kafka");
+		if (kafkaConfig == null) {
+			throw new IllegalStateException("Druid persiste service failed to find kafka configurations!");
+		}
+		props = new Properties();
+		if (kafkaConfig.hasPath(BOOTSTRAP_SERVERS)) {
+			props.put("bootstrap.servers", kafkaConfig.getString(BOOTSTRAP_SERVERS));
+		}
+		if (kafkaConfig.hasPath(ACKS)) {
+			props.put(ACKS, kafkaConfig.getString(ACKS));
+		}
+		if (kafkaConfig.hasPath(RETRIES)) {
+			props.put(RETRIES, kafkaConfig.getInt(RETRIES));
+		}
+		if (kafkaConfig.hasPath(BATCH_SIZE)) {
+			props.put("batch.size", kafkaConfig.getInt(BATCH_SIZE));
+		}
+		if (kafkaConfig.hasPath(LINGER_MS)) {
+			props.put("linger.ms", kafkaConfig.getInt(LINGER_MS));
+		}
+		if (kafkaConfig.hasPath(BUFFER_MEMORY)) {
+			props.put("buffer.memory", kafkaConfig.getLong(BUFFER_MEMORY));
+		}
+		if (kafkaConfig.hasPath(KEY_SERIALIZER)) {
+			props.put("key.serializer", kafkaConfig.getString(KEY_SERIALIZER));
+		} else {
+			props.put("key.serializer", StringSerializer.class.getCanonicalName());
+		}
+//		if (kafkaConfig.hasPath(VALUE_SERIALIZER)) {
+//			props.put("value.serializer", kafkaConfig.getString(VALUE_SERIALIZER));
+//		}
+		props.put("value.serializer", AggregateEntitySerializer.class.getCanonicalName());
+
+		streamTopicMap = new TreeMap<>();
+		if (kafkaConfig.hasPath("topics")) {
+			Config topicConfig = kafkaConfig.getConfig("topics");
+			Set<Map.Entry<String, ConfigValue>> topics = topicConfig.entrySet();
+			for (Map.Entry<String, ConfigValue> t : topics) {
+				streamTopicMap.put(t.getKey(), (String) t.getValue().unwrapped());
+			}
+		}
+
+		producer = new KafkaProducer<>(props);
+	}
+
+	@Override
+	public boolean save(String stream, AggregateEntity apiEntity) throws Exception {
+		if (streamTopicMap.get(stream) != null) {
+			ProducerRecord<String, AggregateEntity> record = new ProducerRecord<>(streamTopicMap.get(stream), apiEntity);
+			Future<RecordMetadata> future = producer.send(record);
+			// TODO : more for check the sending status
+			return true;
+		}
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
index 331e51a..2dec286 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
@@ -19,11 +19,11 @@
 
 package org.apache.eagle.datastream;
 
-import com.typesafe.config.Config;
-
 import java.util.List;
 import java.util.SortedMap;
 
+import com.typesafe.config.Config;
+
 public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
     private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
     private String streamName;
@@ -52,5 +52,9 @@ public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<S
         };
         delegate.flatMap(input, delegateCollector);
     }
+    
+    public JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> getDelegate() {
+    	return delegate;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
new file mode 100644
index 0000000..7e5f271
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+import scala.reflect.runtime.universe._
+
+/**
+ * Execution environment factory
+ *
+ * The factory is mainly used for create or manage execution environment,
+ * and also handles the shared works like configuration, arguments for execution environment
+ *
+ * Notice: this factory class should not know any implementation like storm or spark
+ *
+ * @since 0.3.0
+ */
+object ExecutionEnvironments{
+  /**
+   * Use `'''get[StormExecutionEnvironment](config)'''` instead
+   *
+   * @param config
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(config : Config) = new StormExecutionEnvironment(config)
+
+  /**
+   * Use `'''get[StormExecutionEnvironment]'''` instead
+   *
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm:StormExecutionEnvironment = {
+    val config = ConfigFactory.load()
+    getStorm(config)
+  }
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](args)'''` instead
+   *
+   * @see get[StormExecutionEnvironment](args)
+   *
+   * @param args
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(args:Array[String]):StormExecutionEnvironment = {
+    getStorm(new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
+    get[T](ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param config
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
+    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
+  }
+
+  /**
+   *
+   * @param args
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
+    get[T](new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * Support java style for default config
+   *
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
+    get[T](ConfigFactory.load(),clazz)
+  }
+
+  /**
+   * Support java style
+   * @param config command config
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(config)
+  }
+
+  /**
+   * Support java style
+   *
+   * @param args command arguments in string array
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
deleted file mode 100644
index 7e5f271..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-import scala.reflect.runtime.universe._
-
-/**
- * Execution environment factory
- *
- * The factory is mainly used for create or manage execution environment,
- * and also handles the shared works like configuration, arguments for execution environment
- *
- * Notice: this factory class should not know any implementation like storm or spark
- *
- * @since 0.3.0
- */
-object ExecutionEnvironments{
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-   *
-   * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
-    get[T](ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param config
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
-    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
-  }
-
-  /**
-   *
-   * @param args
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
-    get[T](new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * Support java style for default config
-   *
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
-    get[T](ConfigFactory.load(),clazz)
-  }
-
-  /**
-   * Support java style
-   * @param config command config
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(config)
-  }
-
-  /**
-   * Support java style
-   *
-   * @param args command arguments in string array
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file



Mime
View raw message