eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liangfei.Su" <suliang...@gmail.com>
Subject Re: [Discuss] Eagle Policy State Management
Date Fri, 11 Dec 2015 03:48:23 GMT
Great proposal, this is important and could be general served for policy
capability and analytic feature.

Periodically taken the snapshot independently on each bolt could make
status recoverable from recent history, but from whole topology store point
of view, this could not hand bolt status dependency exactly.

Another point is should the state restore be triggered not only when
topology restarts but also when
a. topology re-balance
b. single bolt movement by underling stream framework for one executor to
another?

Thanks,
Ralph


On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
yonzhang@ebay.com> wrote:

> This topic has been discussed offline for a while and it is time we
> document problems and solutions. With clear problem statements and proposed
> solutions, I believe we can do better before we start implementing.
>
> [Problem Statement] For Eagle as real-time big data monitoring framework
> evaluating policies efficiently is the core functionality. Most of
> meaningful polices are stateful in that policy evaluation is not based on a
> single event but on both previous events and current event. This
> potentially brings 2 fundamental problems, one is policy state loss upon
> machine failures or topology restart, the other is lacking history data
> upon fresh start. One simple example is for a policy like “from
> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
> group by user having cnt > 1000”, if the task is restarted, the state of
> accumulated user/count map is missing. Also when the topology is started at
> the first time, the window is empty even if we have historic data in
> database.
>
> [Proposed Solutions] The natural way of solving the above 2 problems is
> 1) do policy state persist periodically and restore policy state after
> task is restarted
> Underlying policy engine should support snapshot and restore operations.
> In Siddhi 3.x, it already supports snapshot and restore, though I found
> some bugs with their state management.
> https://wso2.org/jira/browse/CEP-1433
> For restore, it is not that straight-forward unless all input events to
> policy evaluator are backed by a reliable and rewindable storage like Kafka.
> If input events to policy evaluator is backed by Kafka, then each time
> when EAGLE takes snapshot, we records the current offset together and
> persist both of them to deep storage.
> If input events to policy evaluator is not backed by Kafka, then we need
> record every event since last snapshot. That looks very expensive. Apache
> Flink uses efficient algorithm called stream barrier to do group
> acknowledgement, but in Storm we don’t have this feature. But remember
> Apache Flink requires that each task do snapshot not only for policy
> evaluator.
>
> 2) transparently load historical data when topology is started at the
> first time
> If policy evaluator accepts data which is already persisted in database or
> Kafka, we can provide API to retrieve data from database or Kafka. This
> loading is transparent to developer, but developer/user needs to specify
> the deep storage for storing historical data.
>
> Also be aware that if we have the right solution for policy evaluator, the
> solution should be also applied to event aggregation.
> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>
> Another aggressive way is to use Flink stream barrier similar solution
> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
> storm ACK mechanism.
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
> }
>
>
> ==>
>
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
>
>   def snapshot : Array[Byte]
>
>   def restore(state: Array[Byte])
> }
>
> This is pretty much important to Eagle if we want Eagle to be a monitoring
> framework with fault-tolerance.
>
> Thanks
> Edward
> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
> Date: Thursday, December 10, 2015 at 9:30
> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>>
> Cc: "dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>"
> <dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>>,
> Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>,
> Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com>>, WSO2
> Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
> Subject: Re: [Dev] [Siddhi] what events is left in the window
>
> Thanks for pointing it out,
>
> We are looking into this.
> Will update you ASAP
>
> Suho
>
> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com<mailto:yonzhang@ebay.com>> wrote:
> By the way, we use siddhi version 3.0.2.
>
> Also for tracking this issue, I created jira
> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
> aggregation on time based window
>
> Thanks
> Edward
>
> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>> wrote:
>
> >Thanks for this suggestion, Suho.
> >
> >I did some testing on state persist and restore, looks most of use cases
> >are working except group by. I am not if Siddhi team has known this.
> >
> >Please look at my test cases : testTimeSlideWindowWithGroupby
> >
> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
> >f9a1f85758168efcb
> >
> >The query is like the following
> >String cseEventStream = "define stream testStream (timeStamp long, user
> >string, cmd string);";
> >                + String query = "@info(name = 'query1') from
> >testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
> >                + + " select user, timeStamp, count() as cnt"
> >                + + " group by user"
> >                + + " having cnt > 2"
> >                + + " insert all events into outputStream;";
> >
> >The basic issue could be the following:
> >1) when taking snapshot, it will persist all Count executor per key. But
> >looks Siddhi adds same Count executor into snapshot list multiple
> >times(The count aggregator elementId is $planName+keyname)
> >2) when restoring snapshot, it will not restore the Count executor for
> >key because snopshotableList does not have the above key.  That key only
> >is generated when event comes in. When do restoration, we don¹t know
> >previous events.
> >
> >for (Snapshotable snapshotable : snapshotableList) {
> >    snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
> >}
> >
> >Please let me know if there is some issue with my test program or
> >something is wrong with Siddhi group by/aggregator snapshot
> >
> >Thanks
> >Edward
> >
> >From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
> ><mailto:suho@wso2.com<mailto:suho@wso2.com>>>
> >Date: Wednesday, November 25, 2015 at 21:07
> >To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
> ><mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
> >Cc: Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com><mailto:
> srinath@wso2.com<mailto:srinath@wso2.com>>>, WSO2
> >Developers' List <dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org
> <mailto:dev@wso2.org>>>
> >Subject: Re: [Dev] [Siddhi] what events is left in the window
> >
> >Hi
> >
> >Currently the concept of current event & expired events live within the
> >query and all events going out to a stream are converted back to current
> >events. So its hard for the application to keep track of the window and
> >aggregation states like count, avg, std, etc...
> >Further window implementations can very based on its implementations
> >hence in some cases knowing what events entered and existed will not be
> >enough to recreate the window during failure.
> >
> >The recommended way to keep track of state in Siddhi is via snapshots,
> >you can take snapshots of the siddhi Runtime with a reasonable time
> >frame. and also buffer a copy of the events sent to siddhi after that
> >snapshot, with this method when there is a failure we should restore the
> >latest snapshot and replay the events which are sent after the last
> >snapshot. The tricky part is the way you buffer events after snapshot,
> >using Kafka and replaying is one option.
> >
> >Regards
> >Suho
> >
> >On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
> ><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
> >I tried expired events before, it only works for the query without
> >groupby. If the query contains groupby and having clause, then it only
> >emit just expired event when having conditions is satisfied.
> >
> >For example
> >
> >String cseEventStream = "define stream TempStream (user string, cmd
> >string);";
> >String query = "@info(name = 'query1') from TempStream#window.length(4) "
> >        + " select user, cmd, count(user) as cnt " +
> >        " group by user " +
> >        "having cnt > 3 "
> >        + " insert all events into DelayedTempStream";
> >
> >If we send events as follows, it will not generate expired events at all.
> >
> >inputHandler.send(new Object[]{"user", "open1"});
> >inputHandler.send(new Object[]{"user", "open2"});
> >inputHandler.send(new Object[]{"user", "open3"});
> >inputHandler.send(new Object[]{"user", "open4"});
> >inputHandler.send(new Object[]{"user", "open5"});
> >
> >
> >Thanks
> >Edward Zhang
> >
> >On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
> ><srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com
> <mailto:srinath@wso2.com>>> wrote:
> >Adding Suho
> >
> >Hi Edward,
> >
> >Each window give you a stream of expired events as well. Would that work?
> >
> >
> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
> >indow
> >
> >Thank
> >Srinath
> >
> >On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
> ><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:
> yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>> wrote:
> >Hi Siddhi team,
> >
> >Do we have anyway of tracking what events are removed from any type of
> >windows, length(batch), or time(batch)? I investigated that removeEvents
> >may not be the right solution.
> >
> >We have one requirement of migrating policy from one machine to another
> >machine but keeping internal state there.
> >
> >Eagle uses policy in storm infrastructure, but one machine which holds
> >the policy fails, then already-populated events in the window also are
> >gone. Sometimes it is bad especially when we have built up a long window
> >like monthly data.
> >
> >One possible way is to keep all events in the siddhi window to be
> >snapshotted into application domain. Another way is to keep tracking what
> >events are coming in and out, so application can track what are left in
> >siddhi window.
> >
> >Here is the ticket for Eagle
> >https://issues.apache.org/jira/browse/EAGLE-39
> >
> >Do you have similar request before? Or what do you suggest?
> >
> >Thanks
> >Edward Zhang
> >
> >_______________________________________________
> >Dev mailing list
> >Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
> >>
> >http://wso2.org/cgi-bin/mailman/listinfo/dev
> >
> >
> >
> >
> >--
> >============================
> >Srinath Perera, Ph.D.
> >   http://people.apache.org/~hemapani/
> >   http://srinathsview.blogspot.com/
> >
> >
> >
> >
> >--
> >S. Suhothayan
> >Technical Lead & Team Lead of WSO2 Complex Event Processor
> >WSO2 Inc. http://wso2.com<http://wso2.com/>
> ><http://wso2.com/>
> >lean . enterprise . middleware
> >
> >cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
> http://suhothayan.blogspot.com/
> >twitter: http://twitter.com/suhothayan | linked-in:
> >http://lk.linkedin.com/in/suhothayan
>
>
>
>
> --
> S. Suhothayan
> Technical Lead & Team Lead of WSO2 Complex Event Processor
> WSO2 Inc. http://wso2.com<http://wso2.com/>
> <http://wso2.com/>
> lean . enterprise . middleware
>
> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
> twitter: http://twitter.com/suhothayan | linked-in:
> http://lk.linkedin.com/in/suhothayan
>

Mime
View raw message