Return-Path: X-Original-To: apmail-eagle-user-archive@minotaur.apache.org Delivered-To: apmail-eagle-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6FC8A182F8 for ; Fri, 11 Dec 2015 19:28:19 +0000 (UTC) Received: (qmail 59580 invoked by uid 500); 11 Dec 2015 19:28:19 -0000 Delivered-To: apmail-eagle-user-archive@eagle.apache.org Received: (qmail 59524 invoked by uid 500); 11 Dec 2015 19:28:19 -0000 Mailing-List: contact user-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@eagle.incubator.apache.org Delivered-To: mailing list user@eagle.incubator.apache.org Received: (qmail 59505 invoked by uid 99); 11 Dec 2015 19:28:19 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 19:28:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9F816C7BFE; Fri, 11 Dec 2015 19:28:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.001 X-Spam-Level: * X-Spam-Status: No, score=1.001 tagged_above=-999 required=6.31 tests=[HEADER_FROM_DIFFERENT_DOMAINS=0.001, KAM_LIVE=1, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ILeFa-2KAk7X; Fri, 11 Dec 2015 19:28:03 +0000 (UTC) Received: from mail-pf0-f177.google.com (mail-pf0-f177.google.com [209.85.192.177]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 85FCF20CC6; Fri, 11 Dec 2015 19:28:02 +0000 (UTC) Received: by pfnn128 with SMTP id n128so70534240pfn.0; Fri, 11 Dec 2015 11:28:01 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=content-type:mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=/ZquyEHDMTyy7AvUQy/OU+XvrqakONKJpimtVmvzKeM=; b=i8SekR1Jq0EElOF2olmDHgvQguSO79rw8ybn8vuU+YbmUX76MwTVSzcXnJHObRW8m0 duqAeHbeVuv92gtr4A0zVCNYwAicJc3+UWK7ODOcimmRRjRDbiw8BtF7uqUWNak/ojq0 4/ikb1HULMm06+p4IDLVQiBrNMv2jCUZmJEcO0zUNASSSFar9Geg4QULLmzif5RdIJZz XKxB9+u7lothpbrR0xcmUy04eHncXIecTtt4E8C+5zhEqV5TRkgRgrbPAcAz7SSBSuLG BD4lH8lI2xwbUyWLDHwXoI0RNmUVK9XYf6MMaGoSqzMw3TkxrV+clMLSSCDrcphGJ7va /ffQ== X-Received: by 10.98.67.156 with SMTP id l28mr3684344pfi.29.1449862080884; Fri, 11 Dec 2015 11:28:00 -0800 (PST) Received: from [192.168.2.200] (c-50-184-110-23.hsd1.ca.comcast.net. [50.184.110.23]) by smtp.gmail.com with ESMTPSA id k85sm26727990pfb.59.2015.12.11.11.27.59 (version=TLS1 cipher=ECDHE-RSA-AES128-SHA bits=128/128); Fri, 11 Dec 2015 11:28:00 -0800 (PST) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 9.1 \(3096.5\)) Subject: Re: [Discuss] Eagle Policy State Management From: Julian Hyde In-Reply-To: Date: Fri, 11 Dec 2015 11:27:58 -0800 Cc: "user@eagle.incubator.apache.org" Content-Transfer-Encoding: quoted-printable Message-Id: <90295931-C5CA-40C2-82AF-ADCC5EBB0F76@apache.org> References: To: dev@eagle.incubator.apache.org X-Mailer: Apple Mail (2.3096.5) State management of streams (including what I=E2=80=99d call =E2=80=9Cderi= ved streams=E2=80=9D) is a hard distributed systems problem. Ideally it = would be solved by the stream provider, not by the Eagle project. I = think you should talk to the various streaming projects =E2=80=94 Storm, = Samza, Kafka, Flink =E2=80=94 and find out whether they can solve this, = or whether it is on their roadmap.=20 I can make introductions to the leaders of those projects if needed. If the problem is solved at source, Eagle can focus on the actual = problem rather than infrastructure. Julian > On Dec 10, 2015, at 7:48 PM, Liangfei.Su wrote: >=20 > Great proposal, this is important and could be general served for = policy > capability and analytic feature. >=20 > Periodically taken the snapshot independently on each bolt could make > status recoverable from recent history, but from whole topology store = point > of view, this could not hand bolt status dependency exactly. >=20 > Another point is should the state restore be triggered not only when > topology restarts but also when > a. topology re-balance > b. single bolt movement by underling stream framework for one executor = to > another? >=20 > Thanks, > Ralph >=20 >=20 > On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) < > yonzhang@ebay.com> wrote: >=20 >> This topic has been discussed offline for a while and it is time we >> document problems and solutions. With clear problem statements and = proposed >> solutions, I believe we can do better before we start implementing. >>=20 >> [Problem Statement] For Eagle as real-time big data monitoring = framework >> evaluating policies efficiently is the core functionality. Most of >> meaningful polices are stateful in that policy evaluation is not = based on a >> single event but on both previous events and current event. This >> potentially brings 2 fundamental problems, one is policy state loss = upon >> machine failures or topology restart, the other is lacking history = data >> upon fresh start. One simple example is for a policy like =E2=80=9Cfrom= >> userActivity[cmd=3D=3D=E2=80=98delete=E2=80=99]time.window(1 month) = select user, count() as cnt >> group by user having cnt > 1000=E2=80=9D, if the task is restarted, = the state of >> accumulated user/count map is missing. Also when the topology is = started at >> the first time, the window is empty even if we have historic data in >> database. >>=20 >> [Proposed Solutions] The natural way of solving the above 2 problems = is >> 1) do policy state persist periodically and restore policy state = after >> task is restarted >> Underlying policy engine should support snapshot and restore = operations. >> In Siddhi 3.x, it already supports snapshot and restore, though I = found >> some bugs with their state management. >> https://wso2.org/jira/browse/CEP-1433 >> For restore, it is not that straight-forward unless all input events = to >> policy evaluator are backed by a reliable and rewindable storage like = Kafka. >> If input events to policy evaluator is backed by Kafka, then each = time >> when EAGLE takes snapshot, we records the current offset together and >> persist both of them to deep storage. >> If input events to policy evaluator is not backed by Kafka, then we = need >> record every event since last snapshot. That looks very expensive. = Apache >> Flink uses efficient algorithm called stream barrier to do group >> acknowledgement, but in Storm we don=E2=80=99t have this feature. But = remember >> Apache Flink requires that each task do snapshot not only for policy >> evaluator. >>=20 >> 2) transparently load historical data when topology is started at the >> first time >> If policy evaluator accepts data which is already persisted in = database or >> Kafka, we can provide API to retrieve data from database or Kafka. = This >> loading is transparent to developer, but developer/user needs to = specify >> the deep storage for storing historical data. >>=20 >> Also be aware that if we have the right solution for policy = evaluator, the >> solution should be also applied to event aggregation. >> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze >>=20 >> Another aggressive way is to use Flink stream barrier similar = solution >> = http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stre= am-processing-with-apache-flink/ >> to take snapshot to all eagle tasks(typically spout and bolt) but = turn off >> storm ACK mechanism. >>=20 >> trait StormStreamExecutor[R <: EagleTuple] extends = FlatMapper[Seq[AnyRef], >> R] { >> def prepareConfig(config : Config) >> def init >> def fields : Array[String] >> } >>=20 >>=20 >> =3D=3D> >>=20 >>=20 >> trait StormStreamExecutor[R <: EagleTuple] extends = FlatMapper[Seq[AnyRef], >> R] { >> def prepareConfig(config : Config) >> def init >> def fields : Array[String] >>=20 >> def snapshot : Array[Byte] >>=20 >> def restore(state: Array[Byte]) >> } >>=20 >> This is pretty much important to Eagle if we want Eagle to be a = monitoring >> framework with fault-tolerance. >>=20 >> Thanks >> Edward >> From: Sriskandarajah Suhothayan > >> Date: Thursday, December 10, 2015 at 9:30 >> To: "Zhang, Edward (GDI Hadoop)" > yonzhang@ebay.com>> >> Cc: = "dev@eagle.incubator.apache.org" >> = >, >> Edward Zhang = >, >> Srinath Perera >, WSO2 >> Developers' List > >> Subject: Re: [Dev] [Siddhi] what events is left in the window >>=20 >> Thanks for pointing it out, >>=20 >> We are looking into this. >> Will update you ASAP >>=20 >> Suho >>=20 >> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) < >> yonzhang@ebay.com> wrote: >> By the way, we use siddhi version 3.0.2. >>=20 >> Also for tracking this issue, I created jira >> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work = for >> aggregation on time based window >>=20 >> Thanks >> Edward >>=20 >> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" = > yonzhang@ebay.com>> wrote: >>=20 >>> Thanks for this suggestion, Suho. >>>=20 >>> I did some testing on state persist and restore, looks most of use = cases >>> are working except group by. I am not if Siddhi team has known this. >>>=20 >>> Please look at my test cases : testTimeSlideWindowWithGroupby >>>=20 >> = https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d= >>> f9a1f85758168efcb >>>=20 >>> The query is like the following >>> String cseEventStream =3D "define stream testStream (timeStamp long, = user >>> string, cmd string);"; >>> + String query =3D "@info(name =3D 'query1') from >>> testStream[cmd =3D=3D 'open']#window.externalTime(timeStamp,3 sec)" >>> + + " select user, timeStamp, count() as cnt" >>> + + " group by user" >>> + + " having cnt > 2" >>> + + " insert all events into outputStream;"; >>>=20 >>> The basic issue could be the following: >>> 1) when taking snapshot, it will persist all Count executor per key. = But >>> looks Siddhi adds same Count executor into snapshot list multiple >>> times(The count aggregator elementId is $planName+keyname) >>> 2) when restoring snapshot, it will not restore the Count executor = for >>> key because snopshotableList does not have the above key. That key = only >>> is generated when event comes in. When do restoration, we don=C2=B9t = know >>> previous events. >>>=20 >>> for (Snapshotable snapshotable : snapshotableList) { >>> = snapshotable.restoreState(snapshots.get(snapshotable.getElementId())); >>> } >>>=20 >>> Please let me know if there is some issue with my test program or >>> something is wrong with Siddhi group by/aggregator snapshot >>>=20 >>> Thanks >>> Edward >>>=20 >>> From: Sriskandarajah Suhothayan >> >> >>> Date: Wednesday, November 25, 2015 at 21:07 >>> To: Edward Zhang = >> >> >>> Cc: Srinath Perera = > srinath@wso2.com>>, WSO2 >>> Developers' List = > >> >>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>>=20 >>> Hi >>>=20 >>> Currently the concept of current event & expired events live within = the >>> query and all events going out to a stream are converted back to = current >>> events. So its hard for the application to keep track of the window = and >>> aggregation states like count, avg, std, etc... >>> Further window implementations can very based on its implementations >>> hence in some cases knowing what events entered and existed will not = be >>> enough to recreate the window during failure. >>>=20 >>> The recommended way to keep track of state in Siddhi is via = snapshots, >>> you can take snapshots of the siddhi Runtime with a reasonable time >>> frame. and also buffer a copy of the events sent to siddhi after = that >>> snapshot, with this method when there is a failure we should restore = the >>> latest snapshot and replay the events which are sent after the last >>> snapshot. The tricky part is the way you buffer events after = snapshot, >>> using Kafka and replaying is one option. >>>=20 >>> Regards >>> Suho >>>=20 >>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang >>> > yonzhang2012@apache.org>> wrote: >>> I tried expired events before, it only works for the query without >>> groupby. If the query contains groupby and having clause, then it = only >>> emit just expired event when having conditions is satisfied. >>>=20 >>> For example >>>=20 >>> String cseEventStream =3D "define stream TempStream (user string, = cmd >>> string);"; >>> String query =3D "@info(name =3D 'query1') from = TempStream#window.length(4) " >>> + " select user, cmd, count(user) as cnt " + >>> " group by user " + >>> "having cnt > 3 " >>> + " insert all events into DelayedTempStream"; >>>=20 >>> If we send events as follows, it will not generate expired events at = all. >>>=20 >>> inputHandler.send(new Object[]{"user", "open1"}); >>> inputHandler.send(new Object[]{"user", "open2"}); >>> inputHandler.send(new Object[]{"user", "open3"}); >>> inputHandler.send(new Object[]{"user", "open4"}); >>> inputHandler.send(new Object[]{"user", "open5"}); >>>=20 >>>=20 >>> Thanks >>> Edward Zhang >>>=20 >>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera >>> > >> wrote: >>> Adding Suho >>>=20 >>> Hi Edward, >>>=20 >>> Each window give you a stream of expired events as well. Would that = work? >>>=20 >>>=20 >> = https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W= >>> indow >>>=20 >>> Thank >>> Srinath >>>=20 >>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang >>> > yonzhang2012@apache.org>> wrote: >>> Hi Siddhi team, >>>=20 >>> Do we have anyway of tracking what events are removed from any type = of >>> windows, length(batch), or time(batch)? I investigated that = removeEvents >>> may not be the right solution. >>>=20 >>> We have one requirement of migrating policy from one machine to = another >>> machine but keeping internal state there. >>>=20 >>> Eagle uses policy in storm infrastructure, but one machine which = holds >>> the policy fails, then already-populated events in the window also = are >>> gone. Sometimes it is bad especially when we have built up a long = window >>> like monthly data. >>>=20 >>> One possible way is to keep all events in the siddhi window to be >>> snapshotted into application domain. Another way is to keep tracking = what >>> events are coming in and out, so application can track what are left = in >>> siddhi window. >>>=20 >>> Here is the ticket for Eagle >>> https://issues.apache.org/jira/browse/EAGLE-39 >>>=20 >>> Do you have similar request before? Or what do you suggest? >>>=20 >>> Thanks >>> Edward Zhang >>>=20 >>> _______________________________________________ >>> Dev mailing list >>> = Dev@wso2.org>>>=20 >>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>>=20 >>>=20 >>>=20 >>>=20 >>> -- >>> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >>> Srinath Perera, Ph.D. >>> http://people.apache.org/~hemapani/ >>> http://srinathsview.blogspot.com/ >>>=20 >>>=20 >>>=20 >>>=20 >>> -- >>> S. Suhothayan >>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>> WSO2 Inc. http://wso2.com >>> >>> lean . enterprise . middleware >>>=20 >>> cell: (+94) 779 756 757 | blog: >> http://suhothayan.blogspot.com/ >>> twitter: http://twitter.com/suhothayan | linked-in: >>> http://lk.linkedin.com/in/suhothayan >>=20 >>=20 >>=20 >>=20 >> -- >> S. Suhothayan >> Technical Lead & Team Lead of WSO2 Complex Event Processor >> WSO2 Inc. http://wso2.com >> >> lean . enterprise . middleware >>=20 >> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/ >> twitter: http://twitter.com/suhothayan | linked-in: >> http://lk.linkedin.com/in/suhothayan >>=20