eagle-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zhang, Edward (GDI Hadoop)" <yonzh...@ebay.com>
Subject [Discuss] Eagle Policy State Management
Date Fri, 11 Dec 2015 01:49:01 GMT
This topic has been discussed offline for a while and it is time we document problems and solutions.
With clear problem statements and proposed solutions, I believe we can do better before we
start implementing.

[Problem Statement] For Eagle as real-time big data monitoring framework evaluating policies
efficiently is the core functionality. Most of meaningful polices are stateful in that policy
evaluation is not based on a single event but on both previous events and current event. This
potentially brings 2 fundamental problems, one is policy state loss upon machine failures
or topology restart, the other is lacking history data upon fresh start. One simple example
is for a policy like “from userActivity[cmd==‘delete’]time.window(1 month) select user,
count() as cnt group by user having cnt > 1000”, if the task is restarted, the state
of accumulated user/count map is missing. Also when the topology is started at the first time,
the window is empty even if we have historic data in database.

[Proposed Solutions] The natural way of solving the above 2 problems is
1) do policy state persist periodically and restore policy state after task is restarted
Underlying policy engine should support snapshot and restore operations. In Siddhi 3.x, it
already supports snapshot and restore, though I found some bugs with their state management.
https://wso2.org/jira/browse/CEP-1433
For restore, it is not that straight-forward unless all input events to policy evaluator are
backed by a reliable and rewindable storage like Kafka.
If input events to policy evaluator is backed by Kafka, then each time when EAGLE takes snapshot,
we records the current offset together and persist both of them to deep storage.
If input events to policy evaluator is not backed by Kafka, then we need record every event
since last snapshot. That looks very expensive. Apache Flink uses efficient algorithm called
stream barrier to do group acknowledgement, but in Storm we don’t have this feature. But
remember Apache Flink requires that each task do snapshot not only for policy evaluator.

2) transparently load historical data when topology is started at the first time
If policy evaluator accepts data which is already persisted in database or Kafka, we can provide
API to retrieve data from database or Kafka. This loading is transparent to developer, but
developer/user needs to specify the deep storage for storing historical data.

Also be aware that if we have the right solution for policy evaluator, the solution should
be also applied to event aggregation.https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze

Another aggressive way is to use Flink stream barrier similar solution http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
to take snapshot to all eagle tasks(typically spout and bolt) but turn off storm ACK mechanism.

trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
  def prepareConfig(config : Config)
  def init
  def fields : Array[String]
}


==>


trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
  def prepareConfig(config : Config)
  def init
  def fields : Array[String]

  def snapshot : Array[Byte]

  def restore(state: Array[Byte])
}

This is pretty much important to Eagle if we want Eagle to be a monitoring framework with
fault-tolerance.

Thanks
Edward
From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com>>
Date: Thursday, December 10, 2015 at 9:30
To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:yonzhang@ebay.com>>
Cc: "dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>" <dev@eagle.incubator.apache.org<mailto:dev@eagle.incubator.apache.org>>,
Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>, Srinath
Perera <srinath@wso2.com<mailto:srinath@wso2.com>>, WSO2 Developers' List <dev@wso2.org<mailto:dev@wso2.org>>
Subject: Re: [Dev] [Siddhi] what events is left in the window

Thanks for pointing it out,

We are looking into this.
Will update you ASAP

Suho

On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <yonzhang@ebay.com<mailto:yonzhang@ebay.com>>
wrote:
By the way, we use siddhi version 3.0.2.

Also for tracking this issue, I created jira
https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
aggregation on time based window

Thanks
Edward

On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:yonzhang@ebay.com>>
wrote:

>Thanks for this suggestion, Suho.
>
>I did some testing on state persist and restore, looks most of use cases
>are working except group by. I am not if Siddhi team has known this.
>
>Please look at my test cases : testTimeSlideWindowWithGroupby
>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>f9a1f85758168efcb
>
>The query is like the following
>String cseEventStream = "define stream testStream (timeStamp long, user
>string, cmd string);";
>                + String query = "@info(name = 'query1') from
>testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>                + + " select user, timeStamp, count() as cnt"
>                + + " group by user"
>                + + " having cnt > 2"
>                + + " insert all events into outputStream;";
>
>The basic issue could be the following:
>1) when taking snapshot, it will persist all Count executor per key. But
>looks Siddhi adds same Count executor into snapshot list multiple
>times(The count aggregator elementId is $planName+keyname)
>2) when restoring snapshot, it will not restore the Count executor for
>key because snopshotableList does not have the above key.  That key only
>is generated when event comes in. When do restoration, we don¹t know
>previous events.
>
>for (Snapshotable snapshotable : snapshotableList) {
>    snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>}
>
>Please let me know if there is some issue with my test program or
>something is wrong with Siddhi group by/aggregator snapshot
>
>Thanks
>Edward
>
>From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com><mailto:suho@wso2.com<mailto:suho@wso2.com>>>
>Date: Wednesday, November 25, 2015 at 21:07
>To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
>Cc: Srinath Perera <srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com<mailto:srinath@wso2.com>>>,
WSO2
>Developers' List <dev@wso2.org<mailto:dev@wso2.org><mailto:dev@wso2.org<mailto:dev@wso2.org>>>
>Subject: Re: [Dev] [Siddhi] what events is left in the window
>
>Hi
>
>Currently the concept of current event & expired events live within the
>query and all events going out to a stream are converted back to current
>events. So its hard for the application to keep track of the window and
>aggregation states like count, avg, std, etc...
>Further window implementations can very based on its implementations
>hence in some cases knowing what events entered and existed will not be
>enough to recreate the window during failure.
>
>The recommended way to keep track of state in Siddhi is via snapshots,
>you can take snapshots of the siddhi Runtime with a reasonable time
>frame. and also buffer a copy of the events sent to siddhi after that
>snapshot, with this method when there is a failure we should restore the
>latest snapshot and replay the events which are sent after the last
>snapshot. The tricky part is the way you buffer events after snapshot,
>using Kafka and replaying is one option.
>
>Regards
>Suho
>
>On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
wrote:
>I tried expired events before, it only works for the query without
>groupby. If the query contains groupby and having clause, then it only
>emit just expired event when having conditions is satisfied.
>
>For example
>
>String cseEventStream = "define stream TempStream (user string, cmd
>string);";
>String query = "@info(name = 'query1') from TempStream#window.length(4) "
>        + " select user, cmd, count(user) as cnt " +
>        " group by user " +
>        "having cnt > 3 "
>        + " insert all events into DelayedTempStream";
>
>If we send events as follows, it will not generate expired events at all.
>
>inputHandler.send(new Object[]{"user", "open1"});
>inputHandler.send(new Object[]{"user", "open2"});
>inputHandler.send(new Object[]{"user", "open3"});
>inputHandler.send(new Object[]{"user", "open4"});
>inputHandler.send(new Object[]{"user", "open5"});
>
>
>Thanks
>Edward Zhang
>
>On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
><srinath@wso2.com<mailto:srinath@wso2.com><mailto:srinath@wso2.com<mailto:srinath@wso2.com>>>
wrote:
>Adding Suho
>
>Hi Edward,
>
>Each window give you a stream of expired events as well. Would that work?
>
>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>indow
>
>Thank
>Srinath
>
>On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org><mailto:yonzhang2012@apache.org<mailto:yonzhang2012@apache.org>>>
wrote:
>Hi Siddhi team,
>
>Do we have anyway of tracking what events are removed from any type of
>windows, length(batch), or time(batch)? I investigated that removeEvents
>may not be the right solution.
>
>We have one requirement of migrating policy from one machine to another
>machine but keeping internal state there.
>
>Eagle uses policy in storm infrastructure, but one machine which holds
>the policy fails, then already-populated events in the window also are
>gone. Sometimes it is bad especially when we have built up a long window
>like monthly data.
>
>One possible way is to keep all events in the siddhi window to be
>snapshotted into application domain. Another way is to keep tracking what
>events are coming in and out, so application can track what are left in
>siddhi window.
>
>Here is the ticket for Eagle
>https://issues.apache.org/jira/browse/EAGLE-39
>
>Do you have similar request before? Or what do you suggest?
>
>Thanks
>Edward Zhang
>
>_______________________________________________
>Dev mailing list
>Dev@wso2.org<mailto:Dev@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org>>
>http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>
>
>
>--
>============================
>Srinath Perera, Ph.D.
>   http://people.apache.org/~hemapani/
>   http://srinathsview.blogspot.com/
>
>
>
>
>--
>S. Suhothayan
>Technical Lead & Team Lead of WSO2 Complex Event Processor
>WSO2 Inc. http://wso2.com<http://wso2.com/>
><http://wso2.com/>
>lean . enterprise . middleware
>
>cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog: http://suhothayan.blogspot.com/
>twitter: http://twitter.com/suhothayan | linked-in:
>http://lk.linkedin.com/in/suhothayan




--
S. Suhothayan
Technical Lead & Team Lead of WSO2 Complex Event Processor
WSO2 Inc. http://wso2.com<http://wso2.com/>
<http://wso2.com/>
lean . enterprise . middleware

cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
twitter: http://twitter.com/suhothayan | linked-in: http://lk.linkedin.com/in/suhothayan
Mime
View raw message