eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zhang, Edward (GDI Hadoop)" <yonzh...@ebay.com>
Subject Re: [Discuss] Eagle Policy State Management
Date Fri, 11 Dec 2015 20:56:08 GMT
That’s cool, I will look into those new features. But does that provide
automatically snapshot and restore?

Thanks
Edward

On 12/11/15, 12:18, "P. Taylor Goetz" <ptgoetz@gmail.com> wrote:

>In Storm 1.0 (which we hope to release in the next month or so) adds
>distributed cache/blobstore functionality that could be leveraged to
>solve a lot of the problems described in this thread. Another relevant
>feature is native windowing with persistent state (currently under
>development).
>
>Documentation of these features is a little light, but I’ll try to
>forward it on to this list when it’s more fully baked.
>
>-Taylor
>
>> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jhyde@apache.org> wrote:
>> 
>> State management of streams (including what I’d call “derived streams”)
>>is a hard distributed systems problem. Ideally it would be solved by the
>>stream provider, not by the Eagle project. I think you should talk to
>>the various streaming projects ― Storm, Samza, Kafka, Flink ― and find
>>out whether they can solve this, or whether it is on their roadmap.
>> 
>> I can make introductions to the leaders of those projects if needed.
>> 
>> If the problem is solved at source, Eagle can focus on the actual
>>problem rather than infrastructure.
>> 
>> Julian
>> 
>> 
>>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <suliangfei@gmail.com> wrote:
>>> 
>>> Great proposal, this is important and could be general served for
>>>policy
>>> capability and analytic feature.
>>> 
>>> Periodically taken the snapshot independently on each bolt could make
>>> status recoverable from recent history, but from whole topology store
>>>point
>>> of view, this could not hand bolt status dependency exactly.
>>> 
>>> Another point is should the state restore be triggered not only when
>>> topology restarts but also when
>>> a. topology re-balance
>>> b. single bolt movement by underling stream framework for one executor
>>>to
>>> another?
>>> 
>>> Thanks,
>>> Ralph
>>> 
>>> 
>>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com> wrote:
>>> 
>>>> This topic has been discussed offline for a while and it is time we
>>>> document problems and solutions. With clear problem statements and
>>>>proposed
>>>> solutions, I believe we can do better before we start implementing.
>>>> 
>>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>>framework
>>>> evaluating policies efficiently is the core functionality. Most of
>>>> meaningful polices are stateful in that policy evaluation is not
>>>>based on a
>>>> single event but on both previous events and current event. This
>>>> potentially brings 2 fundamental problems, one is policy state loss
>>>>upon
>>>> machine failures or topology restart, the other is lacking history
>>>>data
>>>> upon fresh start. One simple example is for a policy like “from
>>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>>as cnt
>>>> group by user having cnt > 1000”, if the task is restarted, the state
>>>>of
>>>> accumulated user/count map is missing. Also when the topology is
>>>>started at
>>>> the first time, the window is empty even if we have historic data in
>>>> database.
>>>> 
>>>> [Proposed Solutions] The natural way of solving the above 2 problems
>>>>is
>>>> 1) do policy state persist periodically and restore policy state after
>>>> task is restarted
>>>> Underlying policy engine should support snapshot and restore
>>>>operations.
>>>> In Siddhi 3.x, it already supports snapshot and restore, though I
>>>>found
>>>> some bugs with their state management.
>>>> https://wso2.org/jira/browse/CEP-1433
>>>> For restore, it is not that straight-forward unless all input events
>>>>to
>>>> policy evaluator are backed by a reliable and rewindable storage like
>>>>Kafka.
>>>> If input events to policy evaluator is backed by Kafka, then each time
>>>> when EAGLE takes snapshot, we records the current offset together and
>>>> persist both of them to deep storage.
>>>> If input events to policy evaluator is not backed by Kafka, then we
>>>>need
>>>> record every event since last snapshot. That looks very expensive.
>>>>Apache
>>>> Flink uses efficient algorithm called stream barrier to do group
>>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>>> Apache Flink requires that each task do snapshot not only for policy
>>>> evaluator.
>>>> 
>>>> 2) transparently load historical data when topology is started at the
>>>> first time
>>>> If policy evaluator accepts data which is already persisted in
>>>>database or
>>>> Kafka, we can provide API to retrieve data from database or Kafka.
>>>>This
>>>> loading is transparent to developer, but developer/user needs to
>>>>specify
>>>> the deep storage for storing historical data.
>>>> 
>>>> Also be aware that if we have the right solution for policy
>>>>evaluator, the
>>>> solution should be also applied to event aggregation.
>>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>>> 
>>>> Another aggressive way is to use Flink stream barrier similar solution
>>>> 
>>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-s
>>>>tream-processing-with-apache-flink/
>>>> to take snapshot to all eagle tasks(typically spout and bolt) but
>>>>turn off
>>>> storm ACK mechanism.
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> }
>>>> 
>>>> 
>>>> ==>
>>>> 
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> 
>>>> def snapshot : Array[Byte]
>>>> 
>>>> def restore(state: Array[Byte])
>>>> }
>>>> 
>>>> This is pretty much important to Eagle if we want Eagle to be a
>>>>monitoring
>>>> framework with fault-tolerance.
>>>> 
>>>> Thanks
>>>> Edward
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
>>>> Date: Thursday, December 10, 2015 at 9:30
>>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>>
>>>> Cc: 
>>>>"dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>"
>>>> 
>>>><dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>>
>>>>,
>>>> Edward Zhang 
>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>,
>>>> Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com>>, WSO2
>>>> Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Thanks for pointing it out,
>>>> 
>>>> We are looking into this.
>>>> Will update you ASAP
>>>> 
>>>> Suho
>>>> 
>>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>>> yonzhang@ebay.com<mailto:yonzhang@ebay.com>> wrote:
>>>> By the way, we use siddhi version 3.0.2.
>>>> 
>>>> Also for tracking this issue, I created jira
>>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>>for
>>>> aggregation on time based window
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>>><yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>> wrote:
>>>> 
>>>>> Thanks for this suggestion, Suho.
>>>>> 
>>>>> I did some testing on state persist and restore, looks most of use
>>>>>cases
>>>>> are working except group by. I am not if Siddhi team has known this.
>>>>> 
>>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>>> 
>>>> 
>>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a
>>>>20d
>>>>> f9a1f85758168efcb
>>>>> 
>>>>> The query is like the following
>>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>>user
>>>>> string, cmd string);";
>>>>>              + String query = "@info(name = 'query1') from
>>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>>              + + " select user, timeStamp, count() as cnt"
>>>>>              + + " group by user"
>>>>>              + + " having cnt > 2"
>>>>>              + + " insert all events into outputStream;";
>>>>> 
>>>>> The basic issue could be the following:
>>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>>But
>>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>>> times(The count aggregator elementId is $planName+keyname)
>>>>> 2) when restoring snapshot, it will not restore the Count executor
>>>>>for
>>>>> key because snopshotableList does not have the above key.  That key
>>>>>only
>>>>> is generated when event comes in. When do restoration, we don¹t know
>>>>> previous events.
>>>>> 
>>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>>  
>>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>>> }
>>>>> 
>>>>> Please let me know if there is some issue with my test program or
>>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>>> 
>>>>> Thanks
>>>>> Edward
>>>>> 
>>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>>> <mailto:suho@wso2.com<mailto:suho@wso2.com>>>
>>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>>> To: Edward Zhang
>>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>>> <mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
>>>>> Cc: Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com><mailto:
>>>> srinath@wso2.com<mailto:srinath@wso2.com>>>, WSO2
>>>>> Developers' List
>>>>><dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org
>>>> <mailto:dev@wso2.org>>>
>>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Currently the concept of current event & expired events live within
>>>>>the
>>>>> query and all events going out to a stream are converted back to
>>>>>current
>>>>> events. So its hard for the application to keep track of the window
>>>>>and
>>>>> aggregation states like count, avg, std, etc...
>>>>> Further window implementations can very based on its implementations
>>>>> hence in some cases knowing what events entered and existed will not
>>>>>be
>>>>> enough to recreate the window during failure.
>>>>> 
>>>>> The recommended way to keep track of state in Siddhi is via
>>>>>snapshots,
>>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>>> snapshot, with this method when there is a failure we should restore
>>>>>the
>>>>> latest snapshot and replay the events which are sent after the last
>>>>> snapshot. The tricky part is the way you buffer events after
>>>>>snapshot,
>>>>> using Kafka and replaying is one option.
>>>>> 
>>>>> Regards
>>>>> Suho
>>>>> 
>>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>>>> I tried expired events before, it only works for the query without
>>>>> groupby. If the query contains groupby and having clause, then it
>>>>>only
>>>>> emit just expired event when having conditions is satisfied.
>>>>> 
>>>>> For example
>>>>> 
>>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>>> string);";
>>>>> String query = "@info(name = 'query1') from
>>>>>TempStream#window.length(4) "
>>>>>      + " select user, cmd, count(user) as cnt " +
>>>>>      " group by user " +
>>>>>      "having cnt > 3 "
>>>>>      + " insert all events into DelayedTempStream";
>>>>> 
>>>>> If we send events as follows, it will not generate expired events at
>>>>>all.
>>>>> 
>>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>>> <srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com
>>>> <mailto:srinath@wso2.com>>> wrote:
>>>>> Adding Suho
>>>>> 
>>>>> Hi Edward,
>>>>> 
>>>>> Each window give you a stream of expired events as well. Would that
>>>>>work?
>>>>> 
>>>>> 
>>>> 
>>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.
>>>>0-W
>>>>> indow
>>>>> 
>>>>> Thank
>>>>> Srinath
>>>>> 
>>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
>>>>> Hi Siddhi team,
>>>>> 
>>>>> Do we have anyway of tracking what events are removed from any type
>>>>>of
>>>>> windows, length(batch), or time(batch)? I investigated that
>>>>>removeEvents
>>>>> may not be the right solution.
>>>>> 
>>>>> We have one requirement of migrating policy from one machine to
>>>>>another
>>>>> machine but keeping internal state there.
>>>>> 
>>>>> Eagle uses policy in storm infrastructure, but one machine which
>>>>>holds
>>>>> the policy fails, then already-populated events in the window also
>>>>>are
>>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>>window
>>>>> like monthly data.
>>>>> 
>>>>> One possible way is to keep all events in the siddhi window to be
>>>>> snapshotted into application domain. Another way is to keep tracking
>>>>>what
>>>>> events are coming in and out, so application can track what are left
>>>>>in
>>>>> siddhi window.
>>>>> 
>>>>> Here is the ticket for Eagle
>>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>>> 
>>>>> Do you have similar request before? Or what do you suggest?
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> _______________________________________________
>>>>> Dev mailing list
>>>>> 
>>>>>Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.
>>>>>org
>>>>>> 
>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> ============================
>>>>> Srinath Perera, Ph.D.
>>>>> http://people.apache.org/~hemapani/
>>>>> http://srinathsview.blogspot.com/
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> S. Suhothayan
>>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>>> <http://wso2.com/>
>>>>> lean . enterprise . middleware
>>>>> 
>>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>>> http://suhothayan.blogspot.com/
>>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>> 
>

Mime
View raw message