eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "P. Taylor Goetz" <ptgo...@gmail.com>
Subject Re: [Discuss] Eagle Policy State Management
Date Fri, 11 Dec 2015 20:18:08 GMT
In Storm 1.0 (which we hope to release in the next month or so) adds distributed cache/blobstore
functionality that could be leveraged to solve a lot of the problems described in this thread.
Another relevant feature is native windowing with persistent state (currently under development).

Documentation of these features is a little light, but I’ll try to forward it on to this
list when it’s more fully baked.

-Taylor

> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jhyde@apache.org> wrote:
> 
> State management of streams (including what I’d call “derived streams”) is a hard
distributed systems problem. Ideally it would be solved by the stream provider, not by the
Eagle project. I think you should talk to the various streaming projects — Storm, Samza,
Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap.
> 
> I can make introductions to the leaders of those projects if needed.
> 
> If the problem is solved at source, Eagle can focus on the actual problem rather than
infrastructure.
> 
> Julian
> 
> 
>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <suliangfei@gmail.com> wrote:
>> 
>> Great proposal, this is important and could be general served for policy
>> capability and analytic feature.
>> 
>> Periodically taken the snapshot independently on each bolt could make
>> status recoverable from recent history, but from whole topology store point
>> of view, this could not hand bolt status dependency exactly.
>> 
>> Another point is should the state restore be triggered not only when
>> topology restarts but also when
>> a. topology re-balance
>> b. single bolt movement by underling stream framework for one executor to
>> another?
>> 
>> Thanks,
>> Ralph
>> 
>> 
>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com> wrote:
>> 
>>> This topic has been discussed offline for a while and it is time we
>>> document problems and solutions. With clear problem statements and proposed
>>> solutions, I believe we can do better before we start implementing.
>>> 
>>> [Problem Statement] For Eagle as real-time big data monitoring framework
>>> evaluating policies efficiently is the core functionality. Most of
>>> meaningful polices are stateful in that policy evaluation is not based on a
>>> single event but on both previous events and current event. This
>>> potentially brings 2 fundamental problems, one is policy state loss upon
>>> machine failures or topology restart, the other is lacking history data
>>> upon fresh start. One simple example is for a policy like “from
>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>>> group by user having cnt > 1000”, if the task is restarted, the state of
>>> accumulated user/count map is missing. Also when the topology is started at
>>> the first time, the window is empty even if we have historic data in
>>> database.
>>> 
>>> [Proposed Solutions] The natural way of solving the above 2 problems is
>>> 1) do policy state persist periodically and restore policy state after
>>> task is restarted
>>> Underlying policy engine should support snapshot and restore operations.
>>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>>> some bugs with their state management.
>>> https://wso2.org/jira/browse/CEP-1433
>>> For restore, it is not that straight-forward unless all input events to
>>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>>> If input events to policy evaluator is backed by Kafka, then each time
>>> when EAGLE takes snapshot, we records the current offset together and
>>> persist both of them to deep storage.
>>> If input events to policy evaluator is not backed by Kafka, then we need
>>> record every event since last snapshot. That looks very expensive. Apache
>>> Flink uses efficient algorithm called stream barrier to do group
>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>> Apache Flink requires that each task do snapshot not only for policy
>>> evaluator.
>>> 
>>> 2) transparently load historical data when topology is started at the
>>> first time
>>> If policy evaluator accepts data which is already persisted in database or
>>> Kafka, we can provide API to retrieve data from database or Kafka. This
>>> loading is transparent to developer, but developer/user needs to specify
>>> the deep storage for storing historical data.
>>> 
>>> Also be aware that if we have the right solution for policy evaluator, the
>>> solution should be also applied to event aggregation.
>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>> 
>>> Another aggressive way is to use Flink stream barrier similar solution
>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>>> storm ACK mechanism.
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> }
>>> 
>>> 
>>> ==>
>>> 
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> 
>>> def snapshot : Array[Byte]
>>> 
>>> def restore(state: Array[Byte])
>>> }
>>> 
>>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>>> framework with fault-tolerance.
>>> 
>>> Thanks
>>> Edward
>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
>>> Date: Thursday, December 10, 2015 at 9:30
>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>>
>>> Cc: "dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>"
>>> <dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>>,
>>> Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>,
>>> Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com>>, WSO2
>>> Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Thanks for pointing it out,
>>> 
>>> We are looking into this.
>>> Will update you ASAP
>>> 
>>> Suho
>>> 
>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com<mailto:yonzhang@ebay.com>> wrote:
>>> By the way, we use siddhi version 3.0.2.
>>> 
>>> Also for tracking this issue, I created jira
>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>>> aggregation on time based window
>>> 
>>> Thanks
>>> Edward
>>> 
>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>> wrote:
>>> 
>>>> Thanks for this suggestion, Suho.
>>>> 
>>>> I did some testing on state persist and restore, looks most of use cases
>>>> are working except group by. I am not if Siddhi team has known this.
>>>> 
>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>> 
>>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>>> f9a1f85758168efcb
>>>> 
>>>> The query is like the following
>>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>>> string, cmd string);";
>>>>              + String query = "@info(name = 'query1') from
>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>              + + " select user, timeStamp, count() as cnt"
>>>>              + + " group by user"
>>>>              + + " having cnt > 2"
>>>>              + + " insert all events into outputStream;";
>>>> 
>>>> The basic issue could be the following:
>>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>> times(The count aggregator elementId is $planName+keyname)
>>>> 2) when restoring snapshot, it will not restore the Count executor for
>>>> key because snopshotableList does not have the above key.  That key only
>>>> is generated when event comes in. When do restoration, we don¹t know
>>>> previous events.
>>>> 
>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>  snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>> }
>>>> 
>>>> Please let me know if there is some issue with my test program or
>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>> <mailto:suho@wso2.com<mailto:suho@wso2.com>>>
>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>> <mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
>>>> Cc: Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com><mailto:
>>> srinath@wso2.com<mailto:srinath@wso2.com>>>, WSO2
>>>> Developers' List <dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org
>>> <mailto:dev@wso2.org>>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Hi
>>>> 
>>>> Currently the concept of current event & expired events live within the
>>>> query and all events going out to a stream are converted back to current
>>>> events. So its hard for the application to keep track of the window and
>>>> aggregation states like count, avg, std, etc...
>>>> Further window implementations can very based on its implementations
>>>> hence in some cases knowing what events entered and existed will not be
>>>> enough to recreate the window during failure.
>>>> 
>>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>> snapshot, with this method when there is a failure we should restore the
>>>> latest snapshot and replay the events which are sent after the last
>>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>>> using Kafka and replaying is one option.
>>>> 
>>>> Regards
>>>> Suho
>>>> 
>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>>> I tried expired events before, it only works for the query without
>>>> groupby. If the query contains groupby and having clause, then it only
>>>> emit just expired event when having conditions is satisfied.
>>>> 
>>>> For example
>>>> 
>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>> string);";
>>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>>      + " select user, cmd, count(user) as cnt " +
>>>>      " group by user " +
>>>>      "having cnt > 3 "
>>>>      + " insert all events into DelayedTempStream";
>>>> 
>>>> If we send events as follows, it will not generate expired events at all.
>>>> 
>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>> 
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>> <srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com
>>> <mailto:srinath@wso2.com>>> wrote:
>>>> Adding Suho
>>>> 
>>>> Hi Edward,
>>>> 
>>>> Each window give you a stream of expired events as well. Would that work?
>>>> 
>>>> 
>>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>>> indow
>>>> 
>>>> Thank
>>>> Srinath
>>>> 
>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>>> Hi Siddhi team,
>>>> 
>>>> Do we have anyway of tracking what events are removed from any type of
>>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>>> may not be the right solution.
>>>> 
>>>> We have one requirement of migrating policy from one machine to another
>>>> machine but keeping internal state there.
>>>> 
>>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>>> the policy fails, then already-populated events in the window also are
>>>> gone. Sometimes it is bad especially when we have built up a long window
>>>> like monthly data.
>>>> 
>>>> One possible way is to keep all events in the siddhi window to be
>>>> snapshotted into application domain. Another way is to keep tracking what
>>>> events are coming in and out, so application can track what are left in
>>>> siddhi window.
>>>> 
>>>> Here is the ticket for Eagle
>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>> 
>>>> Do you have similar request before? Or what do you suggest?
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> _______________________________________________
>>>> Dev mailing list
>>>> Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>>> 
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> ============================
>>>> Srinath Perera, Ph.D.
>>>> http://people.apache.org/~hemapani/
>>>> http://srinathsview.blogspot.com/
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>> http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>>> 
> 


Mime
View raw message