eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jh...@apache.org>
Subject Re: [Discuss] Eagle Policy State Management
Date Fri, 11 Dec 2015 19:27:58 GMT
State management of streams (including what I’d call “derived streams”) is a hard distributed
systems problem. Ideally it would be solved by the stream provider, not by the Eagle project.
I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink —
and find out whether they can solve this, or whether it is on their roadmap. 

I can make introductions to the leaders of those projects if needed.

If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure.

Julian


> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <suliangfei@gmail.com> wrote:
> 
> Great proposal, this is important and could be general served for policy
> capability and analytic feature.
> 
> Periodically taken the snapshot independently on each bolt could make
> status recoverable from recent history, but from whole topology store point
> of view, this could not hand bolt status dependency exactly.
> 
> Another point is should the state restore be triggered not only when
> topology restarts but also when
> a. topology re-balance
> b. single bolt movement by underling stream framework for one executor to
> another?
> 
> Thanks,
> Ralph
> 
> 
> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com> wrote:
> 
>> This topic has been discussed offline for a while and it is time we
>> document problems and solutions. With clear problem statements and proposed
>> solutions, I believe we can do better before we start implementing.
>> 
>> [Problem Statement] For Eagle as real-time big data monitoring framework
>> evaluating policies efficiently is the core functionality. Most of
>> meaningful polices are stateful in that policy evaluation is not based on a
>> single event but on both previous events and current event. This
>> potentially brings 2 fundamental problems, one is policy state loss upon
>> machine failures or topology restart, the other is lacking history data
>> upon fresh start. One simple example is for a policy like “from
>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>> group by user having cnt > 1000”, if the task is restarted, the state of
>> accumulated user/count map is missing. Also when the topology is started at
>> the first time, the window is empty even if we have historic data in
>> database.
>> 
>> [Proposed Solutions] The natural way of solving the above 2 problems is
>> 1) do policy state persist periodically and restore policy state after
>> task is restarted
>> Underlying policy engine should support snapshot and restore operations.
>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>> some bugs with their state management.
>> https://wso2.org/jira/browse/CEP-1433
>> For restore, it is not that straight-forward unless all input events to
>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>> If input events to policy evaluator is backed by Kafka, then each time
>> when EAGLE takes snapshot, we records the current offset together and
>> persist both of them to deep storage.
>> If input events to policy evaluator is not backed by Kafka, then we need
>> record every event since last snapshot. That looks very expensive. Apache
>> Flink uses efficient algorithm called stream barrier to do group
>> acknowledgement, but in Storm we don’t have this feature. But remember
>> Apache Flink requires that each task do snapshot not only for policy
>> evaluator.
>> 
>> 2) transparently load historical data when topology is started at the
>> first time
>> If policy evaluator accepts data which is already persisted in database or
>> Kafka, we can provide API to retrieve data from database or Kafka. This
>> loading is transparent to developer, but developer/user needs to specify
>> the deep storage for storing historical data.
>> 
>> Also be aware that if we have the right solution for policy evaluator, the
>> solution should be also applied to event aggregation.
>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>> 
>> Another aggressive way is to use Flink stream barrier similar solution
>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>> storm ACK mechanism.
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> }
>> 
>> 
>> ==>
>> 
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> 
>>  def snapshot : Array[Byte]
>> 
>>  def restore(state: Array[Byte])
>> }
>> 
>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>> framework with fault-tolerance.
>> 
>> Thanks
>> Edward
>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
>> Date: Thursday, December 10, 2015 at 9:30
>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>>
>> Cc: "dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>"
>> <dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>>,
>> Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>,
>> Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com>>, WSO2
>> Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>> 
>> Thanks for pointing it out,
>> 
>> We are looking into this.
>> Will update you ASAP
>> 
>> Suho
>> 
>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com<mailto:yonzhang@ebay.com>> wrote:
>> By the way, we use siddhi version 3.0.2.
>> 
>> Also for tracking this issue, I created jira
>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>> aggregation on time based window
>> 
>> Thanks
>> Edward
>> 
>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>> wrote:
>> 
>>> Thanks for this suggestion, Suho.
>>> 
>>> I did some testing on state persist and restore, looks most of use cases
>>> are working except group by. I am not if Siddhi team has known this.
>>> 
>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>> 
>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>> f9a1f85758168efcb
>>> 
>>> The query is like the following
>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>> string, cmd string);";
>>>               + String query = "@info(name = 'query1') from
>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>               + + " select user, timeStamp, count() as cnt"
>>>               + + " group by user"
>>>               + + " having cnt > 2"
>>>               + + " insert all events into outputStream;";
>>> 
>>> The basic issue could be the following:
>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>> looks Siddhi adds same Count executor into snapshot list multiple
>>> times(The count aggregator elementId is $planName+keyname)
>>> 2) when restoring snapshot, it will not restore the Count executor for
>>> key because snopshotableList does not have the above key.  That key only
>>> is generated when event comes in. When do restoration, we don¹t know
>>> previous events.
>>> 
>>> for (Snapshotable snapshotable : snapshotableList) {
>>>   snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>> }
>>> 
>>> Please let me know if there is some issue with my test program or
>>> something is wrong with Siddhi group by/aggregator snapshot
>>> 
>>> Thanks
>>> Edward
>>> 
>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>> <mailto:suho@wso2.com<mailto:suho@wso2.com>>>
>>> Date: Wednesday, November 25, 2015 at 21:07
>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>> <mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
>>> Cc: Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com><mailto:
>> srinath@wso2.com<mailto:srinath@wso2.com>>>, WSO2
>>> Developers' List <dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org
>> <mailto:dev@wso2.org>>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Hi
>>> 
>>> Currently the concept of current event & expired events live within the
>>> query and all events going out to a stream are converted back to current
>>> events. So its hard for the application to keep track of the window and
>>> aggregation states like count, avg, std, etc...
>>> Further window implementations can very based on its implementations
>>> hence in some cases knowing what events entered and existed will not be
>>> enough to recreate the window during failure.
>>> 
>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>> frame. and also buffer a copy of the events sent to siddhi after that
>>> snapshot, with this method when there is a failure we should restore the
>>> latest snapshot and replay the events which are sent after the last
>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>> using Kafka and replaying is one option.
>>> 
>>> Regards
>>> Suho
>>> 
>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>> I tried expired events before, it only works for the query without
>>> groupby. If the query contains groupby and having clause, then it only
>>> emit just expired event when having conditions is satisfied.
>>> 
>>> For example
>>> 
>>> String cseEventStream = "define stream TempStream (user string, cmd
>>> string);";
>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>       + " select user, cmd, count(user) as cnt " +
>>>       " group by user " +
>>>       "having cnt > 3 "
>>>       + " insert all events into DelayedTempStream";
>>> 
>>> If we send events as follows, it will not generate expired events at all.
>>> 
>>> inputHandler.send(new Object[]{"user", "open1"});
>>> inputHandler.send(new Object[]{"user", "open2"});
>>> inputHandler.send(new Object[]{"user", "open3"});
>>> inputHandler.send(new Object[]{"user", "open4"});
>>> inputHandler.send(new Object[]{"user", "open5"});
>>> 
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>> <srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com
>> <mailto:srinath@wso2.com>>> wrote:
>>> Adding Suho
>>> 
>>> Hi Edward,
>>> 
>>> Each window give you a stream of expired events as well. Would that work?
>>> 
>>> 
>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>> indow
>>> 
>>> Thank
>>> Srinath
>>> 
>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>> Hi Siddhi team,
>>> 
>>> Do we have anyway of tracking what events are removed from any type of
>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>> may not be the right solution.
>>> 
>>> We have one requirement of migrating policy from one machine to another
>>> machine but keeping internal state there.
>>> 
>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>> the policy fails, then already-populated events in the window also are
>>> gone. Sometimes it is bad especially when we have built up a long window
>>> like monthly data.
>>> 
>>> One possible way is to keep all events in the siddhi window to be
>>> snapshotted into application domain. Another way is to keep tracking what
>>> events are coming in and out, so application can track what are left in
>>> siddhi window.
>>> 
>>> Here is the ticket for Eagle
>>> https://issues.apache.org/jira/browse/EAGLE-39
>>> 
>>> Do you have similar request before? Or what do you suggest?
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> _______________________________________________
>>> Dev mailing list
>>> Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>> 
>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>> 
>>> 
>>> 
>>> 
>>> --
>>> ============================
>>> Srinath Perera, Ph.D.
>>>  http://people.apache.org/~hemapani/
>>>  http://srinathsview.blogspot.com/
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>> http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>> 
>> 
>> 
>> 
>> --
>> S. Suhothayan
>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>> <http://wso2.com/>
>> lean . enterprise . middleware
>> 
>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>> twitter: http://twitter.com/suhothayan | linked-in:
>> http://lk.linkedin.com/in/suhothayan
>> 


Mime
View raw message