Return-Path: X-Original-To: apmail-eagle-commits-archive@minotaur.apache.org Delivered-To: apmail-eagle-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3683D18AEC for ; Wed, 30 Dec 2015 04:54:50 +0000 (UTC) Received: (qmail 8658 invoked by uid 500); 30 Dec 2015 04:54:50 -0000 Delivered-To: apmail-eagle-commits-archive@eagle.apache.org Received: (qmail 8623 invoked by uid 500); 30 Dec 2015 04:54:49 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 8614 invoked by uid 99); 30 Dec 2015 04:54:49 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Dec 2015 04:54:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 525FCC0AA8 for ; Wed, 30 Dec 2015 04:54:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id nv_pjUMoTimL for ; Wed, 30 Dec 2015 04:54:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id B86CB20CB8 for ; Wed, 30 Dec 2015 04:54:46 +0000 (UTC) Received: (qmail 8608 invoked by uid 99); 30 Dec 2015 04:54:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Dec 2015 04:54:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C573EE04D7; Wed, 30 Dec 2015 04:54:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 30 Dec 2015 04:54:44 -0000 Message-Id: <7ceb1c3ef87d4666b21d00656e7d4ef9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-eagle git commit: EAGLE-99 policy distribution reporter report policy distribution stats into local log or eagle service https://issues.apache.org/jira/browse/EAGLE-99 Author: @yonzhang yonzhang2012@apache.org Reviewer: @ralphsu suliangfe Repository: incubator-eagle Updated Branches: refs/heads/master 89f788a9b -> feaeabc18 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java index 7c01a90..3dd43ac 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java @@ -21,6 +21,7 @@ package org.apache.eagle.alert.state; import org.apache.eagle.common.DateTimeUtil; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.wso2.siddhi.core.ExecutionPlanRuntime; import org.wso2.siddhi.core.SiddhiManager; @@ -28,6 +29,8 @@ import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.query.output.callback.QueryCallback; import org.wso2.siddhi.core.stream.input.InputHandler; import org.wso2.siddhi.core.util.EventPrinter; +import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore; +import org.wso2.siddhi.core.util.persistence.PersistenceStore; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -241,10 +244,10 @@ public class TestSiddhiStateSnapshotAndRestore { private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){ SiddhiManager siddhiManager = new SiddhiManager(); String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);"; - String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)" + String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)" + " select user, timeStamp, count(user) as cnt" -// + " group by user" -// + " having cnt > 2" + + " group by user" + + " having cnt > 2" + " insert all events into outputStream;"; ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query); @@ -295,14 +298,61 @@ public class TestSiddhiStateSnapshotAndRestore { restoredRuntime.shutdown(); } + private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){ + String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);"; + String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)" + + " select user, timeStamp, count(user) as cnt" + + " group by user" + + " insert all events into outputStream;"; + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + } + }); + return executionPlanRuntime; + } + + @Test + public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{ + SiddhiManager siddhiManager = new SiddhiManager(); + PersistenceStore persistenceStore = new InMemoryPersistenceStore(); + siddhiManager.setPersistenceStore(persistenceStore); + + ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager); + InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream"); + executionPlanRuntime.start(); + long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000"); + inputHandler.send(new Object[]{curTime, "user", "open"}); + inputHandler.send(new Object[]{curTime + 1000, "user", "open"}); + inputHandler.send(new Object[]{curTime + 2000, "user", "open"}); + inputHandler.send(new Object[]{curTime + 3000, "user", "open"}); + Thread.sleep(100); + executionPlanRuntime.persist(); + executionPlanRuntime.shutdown(); + ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager); + inputHandler = restoredRuntime.getInputHandler("testStream"); + restoredRuntime.start(); + restoredRuntime.restoreLastRevision(); + inputHandler.send(new Object[]{curTime + 4000, "user", "open"}); + inputHandler.send(new Object[]{curTime + 5000, "user", "open"}); + inputHandler.send(new Object[]{curTime + 6000, "user", "open"}); + inputHandler.send(new Object[]{curTime + 7000, "user", "open"}); + Thread.sleep(1000); + restoredRuntime.shutdown(); + } + private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){ SiddhiManager siddhiManager = new SiddhiManager(); String cseEventStream = "define stream testStream (user string, cmd string);"; - String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(100 sec)" + String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)" + " select user, count(user) as cnt" + " group by user" + " having cnt > 2" - + " insert all events into outputStream;"; + + " insert events into outputStream;"; ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query); @@ -356,4 +406,96 @@ public class TestSiddhiStateSnapshotAndRestore { input.close(); restoredRuntime.shutdown(); } + + private int count; + private boolean eventArrived; + + @Before + public void init() { + count = 0; + eventArrived = false; + } + + @Test + public void persistenceTest7() throws InterruptedException { + PersistenceStore persistenceStore = new InMemoryPersistenceStore(); + + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setPersistenceStore(persistenceStore); + + String executionPlan = "" + + "@plan:name('Test') " + + "" + + "define stream StockStream (symbol string, price float, volume int, timestamp long);" + + "" + + "@info(name = 'query1')" + + "from StockStream#window.externalTime(timestamp,30 sec) " + + "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " + + "group by symbol " + + "insert into OutStream "; + + QueryCallback queryCallback = new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + eventArrived = true; + for (Event inEvent : inEvents) { + count++; + Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0))); + if (count == 5) { + Assert.assertEquals(400l, inEvent.getData(2)); + } + if (count == 6) { + Assert.assertEquals(200l, inEvent.getData(2)); + } + } + } + }; + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + executionPlanRuntime.addCallback("query1", queryCallback); + + InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream"); + executionPlanRuntime.start(); + long currentTime = 0; + + inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000}); + Thread.sleep(100); + inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000}); + Thread.sleep(100); + inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000}); + + Thread.sleep(500); + Assert.assertTrue(eventArrived); + Assert.assertEquals(3, count); + + //persisting + Thread.sleep(500); + executionPlanRuntime.persist(); + + //restarting execution plan + Thread.sleep(500); + executionPlanRuntime.shutdown(); + executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + executionPlanRuntime.addCallback("query1", queryCallback); + inputHandler = executionPlanRuntime.getInputHandler("StockStream"); + executionPlanRuntime.start(); + + //loading + executionPlanRuntime.restoreLastRevision(); + + inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000}); + Thread.sleep(100); + inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000}); + Thread.sleep(100); + inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000}); + + //shutdown execution plan + Thread.sleep(500); + executionPlanRuntime.shutdown(); + + Assert.assertEquals(count, 6); + Assert.assertEquals(true, eventArrived); + + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java index a31e27e..f8941bc 100644 --- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java +++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java @@ -40,6 +40,7 @@ public class Tables { // for security tables.add("hiveResourceSensitivity"); tables.add("fileSensitivity"); + tables.add("ipzone"); tables.add("mlmodel"); tables.add("userprofile"); }