eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-99 policy distribution reporter report policy distribution stats into local log or eagle service https://issues.apache.org/jira/browse/EAGLE-99 Author: @yonzhang yonzhang2012@apache.org Reviewer: @ralphsu suliangfe
Date Wed, 30 Dec 2015 04:54:44 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 89f788a9b -> feaeabc18


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
index 7c01a90..3dd43ac 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
@@ -21,6 +21,7 @@ package org.apache.eagle.alert.state;
 
 import org.apache.eagle.common.DateTimeUtil;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.wso2.siddhi.core.ExecutionPlanRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
@@ -28,6 +29,8 @@ import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.query.output.callback.QueryCallback;
 import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.core.util.EventPrinter;
+import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
+import org.wso2.siddhi.core.util.persistence.PersistenceStore;
 
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -241,10 +244,10 @@ public class TestSiddhiStateSnapshotAndRestore {
     private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
         SiddhiManager siddhiManager = new SiddhiManager();
         String cseEventStream = "define stream testStream (timeStamp long, user string, cmd
string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3
sec)"
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30
sec)"
                 + " select user, timeStamp, count(user) as cnt"
-//                + " group by user"
-//                + " having cnt > 2"
+                + " group by user"
+                + " having cnt > 2"
                + " insert all events into outputStream;";
 
         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan')
" + cseEventStream + query);
@@ -295,14 +298,61 @@ public class TestSiddhiStateSnapshotAndRestore {
         restoredRuntime.shutdown();
     }
 
+    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager
siddhiManager){
+        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd
string);";
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300
sec)"
+                + " select user, timeStamp, count(user) as cnt"
+                + " group by user"
+                + " insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan')
" + cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        return executionPlanRuntime;
+    }
+
+    @Test
+    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
+        siddhiManager.setPersistenceStore(persistenceStore);
+
+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
+        executionPlanRuntime.start();
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        inputHandler.send(new Object[]{curTime, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
+        Thread.sleep(100);
+        executionPlanRuntime.persist();
+        executionPlanRuntime.shutdown();
+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
+        inputHandler = restoredRuntime.getInputHandler("testStream");
+        restoredRuntime.start();
+        restoredRuntime.restoreLastRevision();
+        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
+        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
+        Thread.sleep(1000);
+        restoredRuntime.shutdown();
+    }
+
     private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
         SiddhiManager siddhiManager = new SiddhiManager();
         String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(100
sec)"
+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5
sec)"
                 + " select user, count(user) as cnt"
                 + " group by user"
                 + " having cnt > 2"
-                + " insert all events into outputStream;";
+                + " insert events into outputStream;";
 
         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan')
" + cseEventStream + query);
 
@@ -356,4 +406,96 @@ public class TestSiddhiStateSnapshotAndRestore {
         input.close();
         restoredRuntime.shutdown();
     }
+
+    private int count;
+    private boolean eventArrived;
+
+    @Before
+    public void init() {
+        count = 0;
+        eventArrived = false;
+    }
+
+    @Test
+    public void persistenceTest7() throws InterruptedException {
+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
+
+        SiddhiManager siddhiManager = new SiddhiManager();
+        siddhiManager.setPersistenceStore(persistenceStore);
+
+        String executionPlan = "" +
+                "@plan:name('Test') " +
+                "" +
+                "define stream StockStream (symbol string, price float, volume int, timestamp
long);" +
+                "" +
+                "@info(name = 'query1')" +
+                "from StockStream#window.externalTime(timestamp,30 sec) " +
+                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
+                "group by symbol " +
+                "insert into OutStream ";
+
+        QueryCallback queryCallback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                eventArrived = true;
+                for (Event inEvent : inEvents) {
+                    count++;
+                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
+                    if (count == 5) {
+                        Assert.assertEquals(400l, inEvent.getData(2));
+                    }
+                    if (count == 6) {
+                        Assert.assertEquals(200l, inEvent.getData(2));
+                    }
+                }
+            }
+        };
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+        executionPlanRuntime.addCallback("query1", queryCallback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
+        executionPlanRuntime.start();
+        long currentTime = 0;
+
+        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
+
+        Thread.sleep(500);
+        Assert.assertTrue(eventArrived);
+        Assert.assertEquals(3, count);
+
+        //persisting
+        Thread.sleep(500);
+        executionPlanRuntime.persist();
+
+        //restarting execution plan
+        Thread.sleep(500);
+        executionPlanRuntime.shutdown();
+        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+        executionPlanRuntime.addCallback("query1", queryCallback);
+        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
+        executionPlanRuntime.start();
+
+        //loading
+        executionPlanRuntime.restoreLastRevision();
+
+        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
+
+        //shutdown execution plan
+        Thread.sleep(500);
+        executionPlanRuntime.shutdown();
+
+        Assert.assertEquals(count, 6);
+        Assert.assertEquals(true, eventArrived);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
index a31e27e..f8941bc 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
@@ -40,6 +40,7 @@ public class Tables {
         // for security
         tables.add("hiveResourceSensitivity");
         tables.add("fileSensitivity");
+        tables.add("ipzone");
         tables.add("mlmodel");
         tables.add("userprofile");
     }


Mime
View raw message