eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zhang, Edward (GDI Hadoop)" <yonzh...@ebay.com>
Subject Code Review - Re: [Discuss] Eagle Policy State Management
Date Wed, 06 Jan 2016 07:08:47 GMT
Basic code is ready for review and leave comments please.
https://github.com/apache/incubator-eagle/pull/55

Thanks
Edward

On 12/11/15, 12:56, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com> wrote:

>That’s cool, I will look into those new features. But does that provide
>automatically snapshot and restore?
>
>Thanks
>Edward
>
>On 12/11/15, 12:18, "P. Taylor Goetz" <ptgoetz@gmail.com> wrote:
>
>>In Storm 1.0 (which we hope to release in the next month or so) adds
>>distributed cache/blobstore functionality that could be leveraged to
>>solve a lot of the problems described in this thread. Another relevant
>>feature is native windowing with persistent state (currently under
>>development).
>>
>>Documentation of these features is a little light, but I’ll try to
>>forward it on to this list when it’s more fully baked.
>>
>>-Taylor
>>
>>> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jhyde@apache.org> wrote:
>>> 
>>> State management of streams (including what I’d call “derived streams”)
>>>is a hard distributed systems problem. Ideally it would be solved by the
>>>stream provider, not by the Eagle project. I think you should talk to
>>>the various streaming projects ― Storm, Samza, Kafka, Flink ― and find
>>>out whether they can solve this, or whether it is on their roadmap.
>>> 
>>> I can make introductions to the leaders of those projects if needed.
>>> 
>>> If the problem is solved at source, Eagle can focus on the actual
>>>problem rather than infrastructure.
>>> 
>>> Julian
>>> 
>>> 
>>>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <suliangfei@gmail.com> wrote:
>>>> 
>>>> Great proposal, this is important and could be general served for
>>>>policy
>>>> capability and analytic feature.
>>>> 
>>>> Periodically taken the snapshot independently on each bolt could make
>>>> status recoverable from recent history, but from whole topology store
>>>>point
>>>> of view, this could not hand bolt status dependency exactly.
>>>> 
>>>> Another point is should the state restore be triggered not only when
>>>> topology restarts but also when
>>>> a. topology re-balance
>>>> b. single bolt movement by underling stream framework for one executor
>>>>to
>>>> another?
>>>> 
>>>> Thanks,
>>>> Ralph
>>>> 
>>>> 
>>>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>>>> yonzhang@ebay.com> wrote:
>>>> 
>>>>> This topic has been discussed offline for a while and it is time we
>>>>> document problems and solutions. With clear problem statements and
>>>>>proposed
>>>>> solutions, I believe we can do better before we start implementing.
>>>>> 
>>>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>>>framework
>>>>> evaluating policies efficiently is the core functionality. Most of
>>>>> meaningful polices are stateful in that policy evaluation is not
>>>>>based on a
>>>>> single event but on both previous events and current event. This
>>>>> potentially brings 2 fundamental problems, one is policy state loss
>>>>>upon
>>>>> machine failures or topology restart, the other is lacking history
>>>>>data
>>>>> upon fresh start. One simple example is for a policy like “from
>>>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>>>as cnt
>>>>> group by user having cnt > 1000”, if the task is restarted, the
state
>>>>>of
>>>>> accumulated user/count map is missing. Also when the topology is
>>>>>started at
>>>>> the first time, the window is empty even if we have historic data in
>>>>> database.
>>>>> 
>>>>> [Proposed Solutions] The natural way of solving the above 2 problems
>>>>>is
>>>>> 1) do policy state persist periodically and restore policy state
>>>>>after
>>>>> task is restarted
>>>>> Underlying policy engine should support snapshot and restore
>>>>>operations.
>>>>> In Siddhi 3.x, it already supports snapshot and restore, though I
>>>>>found
>>>>> some bugs with their state management.
>>>>> https://wso2.org/jira/browse/CEP-1433
>>>>> For restore, it is not that straight-forward unless all input events
>>>>>to
>>>>> policy evaluator are backed by a reliable and rewindable storage like
>>>>>Kafka.
>>>>> If input events to policy evaluator is backed by Kafka, then each
>>>>>time
>>>>> when EAGLE takes snapshot, we records the current offset together and
>>>>> persist both of them to deep storage.
>>>>> If input events to policy evaluator is not backed by Kafka, then we
>>>>>need
>>>>> record every event since last snapshot. That looks very expensive.
>>>>>Apache
>>>>> Flink uses efficient algorithm called stream barrier to do group
>>>>> acknowledgement, but in Storm we don’t have this feature. But
>>>>>remember
>>>>> Apache Flink requires that each task do snapshot not only for policy
>>>>> evaluator.
>>>>> 
>>>>> 2) transparently load historical data when topology is started at the
>>>>> first time
>>>>> If policy evaluator accepts data which is already persisted in
>>>>>database or
>>>>> Kafka, we can provide API to retrieve data from database or Kafka.
>>>>>This
>>>>> loading is transparent to developer, but developer/user needs to
>>>>>specify
>>>>> the deep storage for storing historical data.
>>>>> 
>>>>> Also be aware that if we have the right solution for policy
>>>>>evaluator, the
>>>>> solution should be also applied to event aggregation.
>>>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>>>> 
>>>>> Another aggressive way is to use Flink stream barrier similar
>>>>>solution
>>>>> 
>>>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-
>>>>>s
>>>>>tream-processing-with-apache-flink/
>>>>> to take snapshot to all eagle tasks(typically spout and bolt) but
>>>>>turn off
>>>>> storm ACK mechanism.
>>>>> 
>>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>>FlatMapper[Seq[AnyRef],
>>>>> R] {
>>>>> def prepareConfig(config : Config)
>>>>> def init
>>>>> def fields : Array[String]
>>>>> }
>>>>> 
>>>>> 
>>>>> ==>
>>>>> 
>>>>> 
>>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>>FlatMapper[Seq[AnyRef],
>>>>> R] {
>>>>> def prepareConfig(config : Config)
>>>>> def init
>>>>> def fields : Array[String]
>>>>> 
>>>>> def snapshot : Array[Byte]
>>>>> 
>>>>> def restore(state: Array[Byte])
>>>>> }
>>>>> 
>>>>> This is pretty much important to Eagle if we want Eagle to be a
>>>>>monitoring
>>>>> framework with fault-tolerance.
>>>>> 
>>>>> Thanks
>>>>> Edward
>>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
>>>>> Date: Thursday, December 10, 2015 at 9:30
>>>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>>>> yonzhang@ebay.com>>
>>>>> Cc: 
>>>>>"dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>
>>>>>"
>>>>> 
>>>>><dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>
>>>>>>
>>>>>,
>>>>> Edward Zhang 
>>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>,
>>>>> Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com>>,
WSO2
>>>>> Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
>>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>>> 
>>>>> Thanks for pointing it out,
>>>>> 
>>>>> We are looking into this.
>>>>> Will update you ASAP
>>>>> 
>>>>> Suho
>>>>> 
>>>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>>>> yonzhang@ebay.com<mailto:yonzhang@ebay.com>> wrote:
>>>>> By the way, we use siddhi version 3.0.2.
>>>>> 
>>>>> Also for tracking this issue, I created jira
>>>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>>>for
>>>>> aggregation on time based window
>>>>> 
>>>>> Thanks
>>>>> Edward
>>>>> 
>>>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>>>><yonzhang@ebay.com<mailto:
>>>>> yonzhang@ebay.com>> wrote:
>>>>> 
>>>>>> Thanks for this suggestion, Suho.
>>>>>> 
>>>>>> I did some testing on state persist and restore, looks most of use
>>>>>>cases
>>>>>> are working except group by. I am not if Siddhi team has known this.
>>>>>> 
>>>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>>>> 
>>>>> 
>>>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592
>>>>>a
>>>>>20d
>>>>>> f9a1f85758168efcb
>>>>>> 
>>>>>> The query is like the following
>>>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>>>user
>>>>>> string, cmd string);";
>>>>>>              + String query = "@info(name = 'query1') from
>>>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>>>              + + " select user, timeStamp, count() as cnt"
>>>>>>              + + " group by user"
>>>>>>              + + " having cnt > 2"
>>>>>>              + + " insert all events into outputStream;";
>>>>>> 
>>>>>> The basic issue could be the following:
>>>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>>>But
>>>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>>>> times(The count aggregator elementId is $planName+keyname)
>>>>>> 2) when restoring snapshot, it will not restore the Count executor
>>>>>>for
>>>>>> key because snopshotableList does not have the above key.  That key
>>>>>>only
>>>>>> is generated when event comes in. When do restoration, we don¹t
know
>>>>>> previous events.
>>>>>> 
>>>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>>>  
>>>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()))
>>>>>>;
>>>>>> }
>>>>>> 
>>>>>> Please let me know if there is some issue with my test program or
>>>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>>>> 
>>>>>> Thanks
>>>>>> Edward
>>>>>> 
>>>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>>>> <mailto:suho@wso2.com<mailto:suho@wso2.com>>>
>>>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>>>> To: Edward Zhang
>>>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>>>> <mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
>>>>>> Cc: Srinath Perera
>>>>>><srinath@wso2.com<mailto:srinath@wso2.com><mailto:
>>>>> srinath@wso2.com<mailto:srinath@wso2.com>>>, WSO2
>>>>>> Developers' List
>>>>>><dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org
>>>>> <mailto:dev@wso2.org>>>
>>>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> Currently the concept of current event & expired events live
within
>>>>>>the
>>>>>> query and all events going out to a stream are converted back to
>>>>>>current
>>>>>> events. So its hard for the application to keep track of the window
>>>>>>and
>>>>>> aggregation states like count, avg, std, etc...
>>>>>> Further window implementations can very based on its implementations
>>>>>> hence in some cases knowing what events entered and existed will
not
>>>>>>be
>>>>>> enough to recreate the window during failure.
>>>>>> 
>>>>>> The recommended way to keep track of state in Siddhi is via
>>>>>>snapshots,
>>>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>>>> frame. and also buffer a copy of the events sent to siddhi after
>>>>>>that
>>>>>> snapshot, with this method when there is a failure we should restore
>>>>>>the
>>>>>> latest snapshot and replay the events which are sent after the last
>>>>>> snapshot. The tricky part is the way you buffer events after
>>>>>>snapshot,
>>>>>> using Kafka and replaying is one option.
>>>>>> 
>>>>>> Regards
>>>>>> Suho
>>>>>> 
>>>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
wrote:
>>>>>> I tried expired events before, it only works for the query without
>>>>>> groupby. If the query contains groupby and having clause, then it
>>>>>>only
>>>>>> emit just expired event when having conditions is satisfied.
>>>>>> 
>>>>>> For example
>>>>>> 
>>>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>>>> string);";
>>>>>> String query = "@info(name = 'query1') from
>>>>>>TempStream#window.length(4) "
>>>>>>      + " select user, cmd, count(user) as cnt " +
>>>>>>      " group by user " +
>>>>>>      "having cnt > 3 "
>>>>>>      + " insert all events into DelayedTempStream";
>>>>>> 
>>>>>> If we send events as follows, it will not generate expired events
at
>>>>>>all.
>>>>>> 
>>>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> Edward Zhang
>>>>>> 
>>>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>>>> <srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com
>>>>> <mailto:srinath@wso2.com>>> wrote:
>>>>>> Adding Suho
>>>>>> 
>>>>>> Hi Edward,
>>>>>> 
>>>>>> Each window give you a stream of expired events as well. Would that
>>>>>>work?
>>>>>> 
>>>>>> 
>>>>> 
>>>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3
>>>>>.
>>>>>0-W
>>>>>> indow
>>>>>> 
>>>>>> Thank
>>>>>> Srinath
>>>>>> 
>>>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>>>> <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
>>>>> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
wrote:
>>>>>> Hi Siddhi team,
>>>>>> 
>>>>>> Do we have anyway of tracking what events are removed from any type
>>>>>>of
>>>>>> windows, length(batch), or time(batch)? I investigated that
>>>>>>removeEvents
>>>>>> may not be the right solution.
>>>>>> 
>>>>>> We have one requirement of migrating policy from one machine to
>>>>>>another
>>>>>> machine but keeping internal state there.
>>>>>> 
>>>>>> Eagle uses policy in storm infrastructure, but one machine which
>>>>>>holds
>>>>>> the policy fails, then already-populated events in the window also
>>>>>>are
>>>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>>>window
>>>>>> like monthly data.
>>>>>> 
>>>>>> One possible way is to keep all events in the siddhi window to be
>>>>>> snapshotted into application domain. Another way is to keep tracking
>>>>>>what
>>>>>> events are coming in and out, so application can track what are left
>>>>>>in
>>>>>> siddhi window.
>>>>>> 
>>>>>> Here is the ticket for Eagle
>>>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>>>> 
>>>>>> Do you have similar request before? Or what do you suggest?
>>>>>> 
>>>>>> Thanks
>>>>>> Edward Zhang
>>>>>> 
>>>>>> _______________________________________________
>>>>>> Dev mailing list
>>>>>> 
>>>>>>Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2
>>>>>>.
>>>>>>org
>>>>>>> 
>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> ============================
>>>>>> Srinath Perera, Ph.D.
>>>>>> http://people.apache.org/~hemapani/
>>>>>> http://srinathsview.blogspot.com/
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> S. Suhothayan
>>>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>>>> <http://wso2.com/>
>>>>>> lean . enterprise . middleware
>>>>>> 
>>>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757>
| blog:
>>>>> http://suhothayan.blogspot.com/
>>>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>>>> http://lk.linkedin.com/in/suhothayan
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> S. Suhothayan
>>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>>> <http://wso2.com/>
>>>>> lean . enterprise . middleware
>>>>> 
>>>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>>> http://lk.linkedin.com/in/suhothayan
>>>>> 
>>> 
>>
>

Mime
View raw message