eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [42/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
new file mode 100644
index 0000000..adcd728
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.state;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import junit.framework.Assert;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+public class TestAggregation {
+    @Test
+    public void test01DownSampling() throws Exception {
+        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
+        String query = "@info(name = 'downSample') "
+                + "from jmxMetric#window.timeBatch(1 sec) "
+                + "select "
+                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
+                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
+                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
+                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
+                + " timestamp as timeWindowEnds "
+                + " INSERT  INTO tmp;";
+
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
+
+        final AtomicInteger counter = new AtomicInteger();
+        plan.addCallback("downSample", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                int count = counter.incrementAndGet();
+                if (count == 1) {
+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
+                } else if(count == 2) {
+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
+                }
+            }
+        });
+        InputHandler input = plan.getInputHandler("jmxMetric");
+
+        plan.start();
+        sendEvent(input);
+        Thread.sleep(100);
+        sendEvent(input);
+        Thread.sleep(1000);
+        sendEvent(input);
+        Thread.sleep(1000);
+        sendEvent(input);
+        Thread.sleep(200);
+        plan.shutdown();
+    }
+
+    // send 3 events
+    private void sendEvent(InputHandler input) throws Exception {
+        int len = 3;
+        Event[] events = new Event[len];
+        for (int i = 0; i < len; i++) {
+            long externalTs = System.currentTimeMillis();
+            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
+            events[i] = new Event(externalTs + i, new Object[] {
+                    15.0,
+                    15,
+                    1000,
+                    2000L,
+                    externalTs + i
+            });
+        }
+
+        for (Event e : events) {
+            input.send(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
new file mode 100644
index 0000000..e0be82c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
@@ -0,0 +1,129 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.state;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+/**
+ * Created by yonzhang on 11/25/15.
+ */
+public class TestSiddhiExpiredEvents {
+    @Test
+    public void testExpiredEventsInLengthWindow() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream TempStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from TempStream#window.length(3) "
+                + " select *"
+                + " insert all events into DelayedTempStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"user", "open1"});
+        inputHandler.send(new Object[]{"user", "open2"});
+        inputHandler.send(new Object[]{"user", "open3"});
+        inputHandler.send(new Object[]{"user", "open4"});
+        inputHandler.send(new Object[]{"user", "open5"});
+        inputHandler.send(new Object[]{"user", "open6"});
+        Thread.sleep(1000);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void testExpiredEventsInLengthBatchWindow() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream TempStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from TempStream#window.lengthBatch(2) "
+                + " select *"
+                + " insert all events into DelayedTempStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"user", "open1"});
+        inputHandler.send(new Object[]{"user", "open2"});
+        inputHandler.send(new Object[]{"user", "open3"});
+        inputHandler.send(new Object[]{"user", "open4"});
+        inputHandler.send(new Object[]{"user", "open5"});
+        inputHandler.send(new Object[]{"user", "open6"});
+        Thread.sleep(1000);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void testExpireEvents2() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream TempStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from TempStream#window.length(4) "
+                + " select user, cmd, count(user) as cnt " +
+                " group by user " +
+                "having cnt > 2 "
+                + " insert all events into DelayedTempStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"user", "open1"});
+        inputHandler.send(new Object[]{"user", "open2"});
+        inputHandler.send(new Object[]{"user", "open3"});
+        inputHandler.send(new Object[]{"user", "open4"});
+        inputHandler.send(new Object[]{"user", "open5"});
+//        inputHandler.send(new Object[]{"user", "open6"});
+//        inputHandler.send(new Object[]{"user", "open7"});
+//        inputHandler.send(new Object[]{"user", "open8"});
+//        inputHandler.send(new Object[]{"user", "open9"});
+        Thread.sleep(1000);
+        executionPlanRuntime.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
new file mode 100644
index 0000000..131be28
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
@@ -0,0 +1,506 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.state;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
+import org.wso2.siddhi.core.util.persistence.PersistenceStore;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+/**
+ * experiment Siddhi state snapshot and restore
+ */
+public class TestSiddhiStateSnapshotAndRestore {
+    private ExecutionPlanRuntime setupRuntimeForSimple(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "" +
+                "define stream testStream (cmd string, src string) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from testStream[(cmd == 'rename') and (src == '/tmp/pii')] " +
+                "select cmd, src " +
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        };
+        executionPlanRuntime.addCallback("query1", callback);
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testSimpleSiddhiQuery() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForSimple();
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForSimple();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
+        input.close();
+        restoredRuntime.shutdown();
+    }
+
+    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindow(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream testStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream#window.length(3) "
+                + " select *"
+                + " insert all events into OutputStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Ignore
+    public void testLengthSlideWindow() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state-lengthslidewindow";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindow();
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindow();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
+        input.close();
+        restoredRuntime.shutdown();
+    }
+
+    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindowWithGroupby(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream testStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream#window.length(50) "
+                + " select user, cmd, count(user) as cnt"
+                + " insert all events into OutputStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Ignore
+    public void testLengthSlideWindowWithGroupby() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state-lengthslidewindowwithgroupby";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
+        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
+        input.close();
+        restoredRuntime.shutdown();
+    }
+
+    private ExecutionPlanRuntime setupRuntimeForTimeSlideWindow(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
+                + " select user, timeStamp " +
+                "insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testTimeSlideWindow() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state-timeslidewindow";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForTimeSlideWindow();
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        inputHandler.send(new Object[]{curTime, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
+
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForTimeSlideWindow();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        inputHandler = restoredRuntime.getInputHandler("testStream");
+        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
+        Thread.sleep(1000);
+        input.close();
+        restoredRuntime.shutdown();
+    }
+    
+    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
+                + " select user, timeStamp, count(user) as cnt"
+                + " group by user"
+                + " having cnt > 2"
+               + " insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testExternalTimeSlideWindowWithGroupby() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state-externaltimeslidewindow";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        inputHandler.send(new Object[]{curTime, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
+        Thread.sleep(1000);
+
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        inputHandler = restoredRuntime.getInputHandler("testStream");
+        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
+        Thread.sleep(1000);
+        input.close();
+        restoredRuntime.shutdown();
+    }
+
+    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
+        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
+                + " select user, timeStamp, count(user) as cnt"
+                + " group by user"
+                + " insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
+        siddhiManager.setPersistenceStore(persistenceStore);
+
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
+        executionPlanRuntime.start();
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        inputHandler.send(new Object[]{curTime, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
+        Thread.sleep(100);
+        executionPlanRuntime.persist();
+        executionPlanRuntime.shutdown();
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
+        inputHandler = restoredRuntime.getInputHandler("testStream");
+        restoredRuntime.start();
+        restoredRuntime.restoreLastRevision();
+        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
+        Thread.sleep(1000);
+        restoredRuntime.shutdown();
+    }
+
+    private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
+        SiddhiManager siddhiManager = new SiddhiManager();
+        String cseEventStream = "define stream testStream (user string, cmd string);";
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
+                + " select user, count(user) as cnt"
+                + " group by user"
+                + " having cnt > 2"
+                + " insert events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        executionPlanRuntime.start();
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testInternalTimeSlideWindowWithGroupby() throws Exception{
+        String tmpdir = System.getProperty("java.io.tmpdir");
+        System.out.println("temporary directory: " + tmpdir);
+
+        String stateFile = tmpdir + "/siddhi-state-internaltimeslidewindow";
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+
+        byte[] state = executionPlanRuntime.snapshot();
+        int length = state.length;
+        FileOutputStream output = new FileOutputStream(stateFile);
+        output.write(state);
+        output.close();
+        executionPlanRuntime.shutdown();
+
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
+        FileInputStream input = new FileInputStream(stateFile);
+        byte[] restoredState = new byte[length];
+        input.read(restoredState);
+        restoredRuntime.restore(restoredState);
+        inputHandler = restoredRuntime.getInputHandler("testStream");
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        inputHandler.send(new Object[]{"user", "open"});
+        Thread.sleep(1000);
+        input.close();
+        restoredRuntime.shutdown();
+    }
+
+    private int count;
+    private boolean eventArrived;
+
+    @Before
+    public void init() {
+        count = 0;
+        eventArrived = false;
+    }
+
+    /**
+     * Siddhi does not support external time window based snapshot
+     * @throws InterruptedException
+     */
+    public void persistenceTest7() throws InterruptedException {
+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
+
+        SiddhiManager siddhiManager = new SiddhiManager();
+        siddhiManager.setPersistenceStore(persistenceStore);
+
+        String executionPlan = "" +
+                "@plan:name('Test') " +
+                "" +
+                "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
+                "" +
+                "@info(name = 'query1')" +
+                "from StockStream#window.externalTime(timestamp,30 sec) " +
+                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
+                "group by symbol " +
+                "insert into OutStream ";
+
+        QueryCallback queryCallback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                eventArrived = true;
+                for (Event inEvent : inEvents) {
+                    count++;
+                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
+                    if (count == 5) {
+                        Assert.assertEquals(400l, inEvent.getData(2));
+                    }
+                    if (count == 6) {
+                        Assert.assertEquals(200l, inEvent.getData(2));
+                    }
+                }
+            }
+        };
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+        executionPlanRuntime.addCallback("query1", queryCallback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
+        executionPlanRuntime.start();
+        long currentTime = 0;
+
+        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
+
+        Thread.sleep(500);
+        Assert.assertTrue(eventArrived);
+        Assert.assertEquals(3, count);
+
+        //persisting
+        Thread.sleep(500);
+        executionPlanRuntime.persist();
+
+        //restarting execution plan
+        Thread.sleep(500);
+        executionPlanRuntime.shutdown();
+        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+        executionPlanRuntime.addCallback("query1", queryCallback);
+        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
+        executionPlanRuntime.start();
+
+        //loading
+        executionPlanRuntime.restoreLastRevision();
+
+        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
+
+        //shutdown execution plan
+        Thread.sleep(500);
+        executionPlanRuntime.shutdown();
+
+        Assert.assertEquals(count, 6);
+        Assert.assertEquals(true, eventArrived);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
new file mode 100644
index 0000000..524b867
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "eagleProps" : {
+    "site" : "sandbox",
+    "application" : "UnitTest",
+    "eagleService": {
+      "host": "localhost",
+      "port": 38080,
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  "dynamicConfigSource" : {
+    "enabled" : true,
+    "initDelayMillis" : 0,
+    "delayMillis" : 30000,
+    "ignoreDeleteFromSource" : true
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71a5dac
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, DRFA, stdout
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
new file mode 100644
index 0000000..435b4c3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
+coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
+concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
+length=org.wso2.siddhi.extension.string.LengthFunctionExtension
+lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
+regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
+repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
+replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
+replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
+reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
+strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
+substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
+trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
+upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
+hex=org.wso2.siddhi.extension.string.HexFunctionExtension
+unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
+contains=org.wso2.siddhi.extension.string.ContainsFunctionExtension
+
+# Eagle Siddhi Extension
+equalsIgnoreCase=org.apache.eagle.policy.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.policy.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.policy.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
new file mode 100644
index 0000000..1d18b67
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "auditLogProcessTopology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 2,
+      "hdfsAuditLogAlertExecutor*" : 3
+    }
+  },
+  "dataSourceConfig": {
+    "flavor" : "stormkafka",
+    "topic" : "hdfs_audit_log",
+    "zkConnection" : "localhost:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "EagleConsumer",
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "transactionZKServers" : "localhost",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/brokers/topics",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+     "hdfsAuditLogAlertExecutor" : {
+       "parallelism" : 2,
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+       "needValidation" : "true"
+     }
+  },
+  "eagleProps" : {
+    "site": "site1",
+    "application": "hdfsAuditLog",
+    "eagleService": {
+       "host": "localhost",
+       "port": 38080
+    },
+  	"dataJoinPollIntervalSec" : 30,
+    "env"       : "test",
+    "mail.host" : "mailHost.com",
+	  "mail.smtp.port":"25",
+    "mail.debug" : "true"
+  },
+  "dynamicConfigSource" : {
+  	"enabled" : false,
+  	"initDelayMillis" : 0,
+  	"delayMillis" : 1000
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
new file mode 100644
index 0000000..54226aa
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.eagle</groupId>
+		<artifactId>eagle-alert-parent</artifactId>
+		<version>0.4.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+	</parent>
+
+	<artifactId>eagle-alert-service</artifactId>
+	<packaging>jar</packaging>
+	<name>eagle-alert-service</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-alert-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-alert-process</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	  	<dependency>
+  			<groupId>org.apache.eagle</groupId>
+  			<artifactId>eagle-service-base</artifactId>
+			<version>${project.version}</version>
+  		</dependency>
+	  	<dependency>
+  			<groupId>org.wso2.siddhi</groupId>
+  			<artifactId>siddhi-core</artifactId>
+  		</dependency>
+		<dependency>
+  			<groupId>org.wso2.siddhi</groupId>
+  			<artifactId>siddhi-extension-string</artifactId>
+  		</dependency>
+	</dependencies>
+</project>
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/AlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/AlertPolicyValidateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/AlertPolicyValidateProvider.java
new file mode 100644
index 0000000..47ddcb9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/AlertPolicyValidateProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert;
+
+import java.util.List;
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.Module;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class AlertPolicyValidateProvider {
+	public String type;
+
+	public abstract GenericServiceAPIResponseEntity<String> validate();
+	
+	public abstract String PolicyType();
+	
+	public abstract List<Module> BindingModules();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/PolicyValidateResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/PolicyValidateResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/PolicyValidateResource.java
new file mode 100644
index 0000000..e4c19d6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/PolicyValidateResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import com.fasterxml.jackson.databind.Module;
+
+@Path("/policy/validate")
+public class PolicyValidateResource {
+		
+	public static Logger LOG = LoggerFactory.getLogger(PolicyValidateResource.class);
+	
+	@SuppressWarnings({"rawtypes"})
+	@POST
+	@Consumes(MediaType.APPLICATION_JSON)
+	@Produces(MediaType.APPLICATION_JSON)
+	public GenericServiceAPIResponseEntity validatePolicy(String policyToValidate) {
+        ServiceLoader<AlertPolicyValidateProvider> loader = ServiceLoader.load(AlertPolicyValidateProvider.class);
+        Iterator<AlertPolicyValidateProvider> iter = loader.iterator();
+        List<Module> modules = new ArrayList<Module>();
+        while(iter.hasNext()) {
+        	AlertPolicyValidateProvider factory = iter.next();
+            LOG.info("Supported policy type : " + factory.PolicyType());
+            modules.addAll(factory.BindingModules());
+        }
+        AlertPolicyValidateProvider policyValidate = null;
+        try {
+        	policyValidate = JsonSerDeserUtils.deserialize(policyToValidate, AlertPolicyValidateProvider.class, modules);    		
+        }
+        catch (Exception ex) {
+        	LOG.error("Fail consutructing AlertPolicyValidateProvider ", ex);
+        }
+        return policyValidate.validate();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
new file mode 100644
index 0000000..7f5bddd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.policy.siddhi.AttributeType;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.common.DateTimeUtil;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+public class SiddhiAlertPolicyValidateProvider extends AlertPolicyValidateProvider{
+
+	public String type;
+	public List<String> streamNames;
+	public String policyDefinition;	
+	public static Logger LOG = LoggerFactory.getLogger(PolicyValidateResource.class);
+	public static final String EXECUTION_PLAN_NAME = "query";
+	
+	@SuppressWarnings({"unchecked"})
+	public String getStreamDef(String streamName) {
+		GenericEntityServiceResource resource = new GenericEntityServiceResource();
+		String startTime = "1969-01-01 00:00:00";
+		String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(Long.MAX_VALUE);
+		int pageSize = 1000;
+		String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@streamName=\"" + streamName + "\"]{*}";
+		GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> streamResponse = resource.search(query, startTime, endTime, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+		List<AlertStreamSchemaEntity> list = streamResponse.getObj();
+		
+		Map<String, String> map = new HashMap<String, String>();
+		for(AlertStreamSchemaEntity entity : list){
+			map.put(entity.getTags().get("attrName"), entity.getAttrType());
+		}
+		StringBuilder sb = new StringBuilder();
+		sb.append("dataobj object,");
+		for(Map.Entry<String, String> entry : map.entrySet()){
+			String attrName = entry.getKey();
+			sb.append(attrName);
+			sb.append(" ");
+			String attrType = entry.getValue();
+			if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
+				sb.append("string");
+			}else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
+				sb.append("int");
+			}else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
+				sb.append("long");
+			}else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
+				sb.append("bool");
+			}else{
+				LOG.error("AttrType is not recognized, ignore : " + attrType);
+			}
+			sb.append(",");
+		}
+		if(sb.length() > 0){
+			sb.deleteCharAt(sb.length()-1);
+		}
+		
+		String siddhiStreamDefFormat = "define stream " + streamName + " (" + "%s" + ");";
+		String streamDef = String.format(siddhiStreamDefFormat, sb.toString());
+		return streamDef;
+	}
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public GenericServiceAPIResponseEntity validate() {
+		GenericServiceAPIResponseEntity result = new GenericServiceAPIResponseEntity();
+		SiddhiManager siddhiManager = new SiddhiManager();
+		ExecutionPlanRuntime executionPlanRuntime = null;
+		try {				
+			String streamDefs = new String();
+			for(String streamName : streamNames){
+				//String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(streamName);
+				//We don't use SiddhiStreamMetadataUtils, for it only consume one dataSource
+				String streamDef = getStreamDef(streamName);
+				LOG.info("Siddhi stream definition : " + streamDef);
+				streamDefs += streamDef;
+			}		
+			
+			String executionPlan = streamDefs + " @info(name = '" + EXECUTION_PLAN_NAME + "') " +  policyDefinition;
+			executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+		}
+		catch (Exception ex) {
+			result.setSuccess(false);
+			result.setException(ex);
+			return result;
+		}
+		finally {
+			if (executionPlanRuntime != null) {
+				executionPlanRuntime.shutdown();
+			}
+		}
+		result.setSuccess(true);
+		return result;
+	}
+	
+	@Override
+	public String PolicyType() {
+		return Constants.policyType.siddhiCEPEngine.name();
+	}
+	
+	@Override
+	public List<Module> BindingModules() {
+		Module module = new SimpleModule("policyValidate").registerSubtypes(new NamedType(SiddhiAlertPolicyValidateProvider.class, PolicyType()));
+		return Arrays.asList(module);	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
new file mode 100644
index 0000000..41ece1d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.alert;
+
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
+
+import java.util.List;
+import java.util.Map;
+
+public class SiteApplicationObject extends TaggedLogAPIEntity {
+
+    public Boolean getEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(Boolean enabled) {
+        this.enabled = enabled;
+        valueChanged("enabled");
+    }
+
+    public List<SiteApplicationServiceEntity> getApplications() {
+        return applications;
+    }
+
+    public void setApplications(List<SiteApplicationServiceEntity> applications) {
+        this.applications = applications;
+        valueChanged("applicationList");
+    }
+
+    @Override
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    @Override
+    public void setTags(Map<String, String> tags) {
+        this.tags = tags;
+        valueChanged("tags");
+    }
+
+    Map<String, String> tags;
+    Boolean enabled;
+    List<SiteApplicationServiceEntity> applications;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
new file mode 100644
index 0000000..e399189
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.alert;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.SiteDescServiceEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.apache.eagle.alert.entity.ApplicationDescServiceEntity;
+import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.TypeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+@Path(SiteApplicationResource.ROOT_PATH)
+public class SiteApplicationResource {
+    private final static Logger LOG = LoggerFactory.getLogger(SiteApplicationResource.class);
+    private final static GenericEntityServiceResource resource = new GenericEntityServiceResource();
+    public final static String ROOT_PATH = "/module";
+
+    @Path("site")
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteSite(@QueryParam("site") String site) {
+        String siteQuery = Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@site=\"" + site + "\"]{*}";
+        String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}";
+        int pageSize = Integer.MAX_VALUE;
+
+        GenericServiceAPIResponseEntity response = resource.deleteByQuery(siteQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+        if(response.isSuccess()) {
+            response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+            if(!response.isSuccess()) {
+                LOG.error(response.getException());
+            }
+        } else {
+            LOG.error(response.getException());
+        }
+        return response;
+    }
+
+    @Path("application")
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteApplication(@QueryParam("application") String application) {
+        String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@application=\"" + application + "\"]{*}";
+        String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@application=\"" + application + "\"]{*}";
+        int pageSize = Integer.MAX_VALUE;
+
+        GenericServiceAPIResponseEntity response = resource.deleteByQuery(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+        if(response.isSuccess()) {
+            response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+            if(!response.isSuccess()) {
+                LOG.error(response.getException());
+            }
+        } else {
+            LOG.error(response.getException());
+        }
+        return response;
+    }
+
+    @Path("feature")
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteFeature(@QueryParam("feature") String feature) {
+        String featureQuery = Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@feature=\"" + feature + "\"]{*}";
+        String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME + "[]{*}";
+        int pageSize = Integer.MAX_VALUE;
+
+        GenericServiceAPIResponseEntity response = resource.deleteByQuery(featureQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+        if(response.isSuccess()) {
+            response = resource.search(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+            if(response.isSuccess()) {
+                List<ApplicationDescServiceEntity> entityList = response.getObj();
+                Boolean isModified = false;
+                for(ApplicationDescServiceEntity entity : entityList) {
+                    if(entity.getFeatures().contains(feature)) {
+                        List<String> features = entity.getFeatures();
+                        features.remove(feature);
+                        entity.setFeatures(features);
+                        isModified = true;
+                    }
+                }
+                if(isModified) {
+                    response = resource.updateEntities(entityList, Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME);
+                }
+            }
+        }
+        if(!response.isSuccess()) {
+            LOG.error(response.getException());
+        }
+        return response;
+    }
+
+    @Path("siteApplication")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity createSiteApplications(InputStream inputStream) {
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity<>();
+        int pageSize = Integer.MAX_VALUE;
+        try {
+            List<SiteApplicationObject> entities = (List<SiteApplicationObject>) unmarshalSiteApplicationEntities(inputStream);
+            if(entities == null) {
+                throw new IllegalArgumentException("cannot convert to SiteApplicationObject");
+            }
+            List<SiteDescServiceEntity> siteEntities = new LinkedList<>();
+            List<SiteApplicationServiceEntity> applicationEntities = new LinkedList<>();
+            Set<String> sites = new HashSet<>();
+            for(SiteApplicationObject e : entities) {
+                sites.add(e.getTags().get("site"));
+                SiteDescServiceEntity entity = new SiteDescServiceEntity();
+                entity.setEnabled(e.getEnabled());
+                entity.setTags(e.getTags());
+                siteEntities.add(entity);
+                applicationEntities.addAll(e.getApplications());
+            }
+            response = resource.updateEntities(siteEntities, Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME);
+            if(response.isSuccess()) {
+                String query = buildQueryWithAttributeList(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, "site", sites);
+                LOG.info("query=" + query);
+                response = resource.search(query, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+                if(response.isSuccess()) {
+                    List<SiteApplicationServiceEntity> applications = response.getObj();
+                    for(SiteApplicationServiceEntity app : applications) {
+                        app.setEnabled(false);
+                    }
+                    response = resource.updateEntities(applications, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
+                    if(response.isSuccess()) {
+                        response = resource.updateEntities(applicationEntities, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
+                    }
+                }
+            }
+            if(!response.isSuccess()) {
+                LOG.error(response.getException());
+            }
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            response.setException(ex);
+        }
+        return response;
+    }
+
+    private String buildQueryWithAttributeList(String serviceName, String attr, Set<String> sets) {
+        StringBuilder builder = new StringBuilder(serviceName + "[");
+        String attribute = "@" + attr + "=";
+        String condition = " OR ";
+        for(String s : sets) {
+            String value = String.format("\"%s\"", s);
+            builder.append(attribute + value);
+            builder.append(condition);
+        }
+        String result = builder.substring(0, builder.length()-condition.length());
+        result = result + "]{*}";
+        return result;
+    }
+
+    private List<? extends TaggedLogAPIEntity> unmarshalSiteApplicationEntities(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, SiteApplicationObject.class));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolvable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolvable.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolvable.java
new file mode 100644
index 0000000..ecfd47f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolvable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+import java.util.List;
+
+/**
+ * @param <R>
+ * @param <V>
+ * @since 6/16/15
+ */
+public interface AttributeResolvable<R extends GenericAttributeResolveRequest, V> {
+    /**
+     * @param request request type
+     * @return List&lt;V&gt;
+     * @throws AttributeResolveException
+     */
+    List<V> resolve(R request) throws AttributeResolveException;
+
+    /**
+     * validate request
+     * @throws BadAttributeResolveRequestException
+     */
+    void validateRequest(R request) throws BadAttributeResolveRequestException;
+
+    /**
+     * @return Class&lt;R&gt;
+     */
+    Class<R> getRequestClass();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveException.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveException.java
new file mode 100644
index 0000000..56cd942
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+/**
+ * @since 6/16/15
+ */
+public class AttributeResolveException extends Exception {
+    public AttributeResolveException(String s, Exception e) { super(s,e); }
+    public AttributeResolveException(Exception e) { super(e); }
+    public AttributeResolveException(String s) { super(s); }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
new file mode 100644
index 0000000..68995d1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.common.EagleExceptionWrapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.ws.rs.*;
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * @since 6/17/15
+ */
+@Path("/stream")
+public class AttributeResolveResource {
+    @POST
+    @Path("attributeresolve")
+    @Consumes({"application/json"})
+    @Produces({"application/json"})
+    public GenericServiceAPIResponseEntity attributeResolve(InputStream request,
+                                                            @QueryParam("resolver") String resolver){
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        try {
+            if(resolver == null) throw new AttributeResolveException("resolver is null");
+            AttributeResolvable resolvable = AttributeResolverFactory.getAttributeResolver(resolver);
+            ObjectMapper objectMapper = new ObjectMapper();
+            Class<?> resolveRequestClass = resolvable.getRequestClass();
+            if(resolveRequestClass == null) throw new AttributeResolveException("Request class is null for resolver "+resolver);
+            GenericAttributeResolveRequest resolveRequest = (GenericAttributeResolveRequest) objectMapper.readValue(request, resolvable.getRequestClass());
+            resolvable.validateRequest(resolveRequest);
+            List result = resolvable.resolve(resolveRequest);
+            response.setSuccess(true);
+            response.setObj(result);
+        } catch (Exception e) {
+            response.setSuccess(false);
+            response.setException(EagleExceptionWrapper.wrap(e));
+            return response;
+        }
+        return response;
+    }
+
+    @GET
+    @Path("attributeresolve")
+    @Produces({"application/json"})
+    public GenericServiceAPIResponseEntity attributeResolver(
+            @QueryParam("resolver") String resolver, @QueryParam("site") String site, @QueryParam("query") String query){
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        try {
+            if(resolver == null) throw new AttributeResolveException("resolver is null");
+            AttributeResolvable resolvable = AttributeResolverFactory.getAttributeResolver(resolver);
+            Class<?> resolveRequestClass = resolvable.getRequestClass();
+            if(resolveRequestClass == null) throw new AttributeResolveException("Request class is null for resolver "+resolver);
+            GenericAttributeResolveRequest resolveRequest = new GenericAttributeResolveRequest(query,site);
+            resolvable.validateRequest(resolveRequest);
+            List result = resolvable.resolve(resolveRequest);
+            response.setSuccess(true);
+            response.setObj(result);
+        } catch (Exception e) {
+            response.setSuccess(false);
+            response.setException(EagleExceptionWrapper.wrap(e));
+            return response;
+        }
+        return response;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
new file mode 100644
index 0000000..0ec9cf4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+import java.util.List;
+
+public class AttributeResolveResponse<V> {
+    private String exception;
+    private List<V> values;
+
+    public List<V> getValues() {
+        return values;
+    }
+
+    public void setValues(List<V> values) {
+        this.values = values;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolverFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolverFactory.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolverFactory.java
new file mode 100644
index 0000000..4015fb7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolverFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+* @since 6/16/15
+*/
+public final class AttributeResolverFactory {
+    private final static Map<String,AttributeResolvable> fieldResolvableCache = Collections.synchronizedMap(new HashMap<String,AttributeResolvable>());
+    public static AttributeResolvable getAttributeResolver(String fieldResolverName) throws AttributeResolveException {
+        AttributeResolvable instance;
+        if(fieldResolvableCache.containsKey(fieldResolverName)){
+            instance = fieldResolvableCache.get(fieldResolverName);
+        } else {
+            try {
+                instance = (AttributeResolvable) Class.forName(fieldResolverName).newInstance();
+                fieldResolvableCache.put(fieldResolverName, instance);
+            } catch (ClassNotFoundException e) {
+                throw new AttributeResolveException("Attribute Resolver in type of "+fieldResolverName+" is not found",e);
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new AttributeResolveException(e);
+            }
+        }
+        return instance;
+    }
+
+    public static List resolve(String resolver, GenericAttributeResolveRequest request) throws AttributeResolveException {
+        AttributeResolvable fieldResolver = getAttributeResolver(resolver);
+        return fieldResolver.resolve(request);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/BadAttributeResolveRequestException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/BadAttributeResolveRequestException.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/BadAttributeResolveRequestException.java
new file mode 100644
index 0000000..5a2d7e6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/BadAttributeResolveRequestException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resolver;
+
+public class BadAttributeResolveRequestException extends Exception {
+    public BadAttributeResolveRequestException(Exception ex){
+        super(ex);
+    }
+
+    public BadAttributeResolveRequestException(String msg){
+        super(msg);
+    }
+
+    public BadAttributeResolveRequestException(String msg, Exception ex){
+        super(msg, ex);
+    }
+}



Mime
View raw message