flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paris Carbone <par...@kth.se>
Subject Re: Window's Checkpoint problem
Date Mon, 28 Nov 2016 12:15:01 GMT
Syinchwun,

Can you maybe share more technical details such as design docs/papers about this approach,
or is it confidential? 
It sounds interesting but the details make a difference (i.e. partial boundaries - bookkeeping).

Paris

> On 28 Nov 2016, at 13:01, liuxinchun <liuxinchun@huawei.com> wrote:
> 
> Dear Aljoscha:
> 
> I'm the colleague of 时某人,and participate in the design of Checkpoint mechanism
of a certain Streaming Processing System, which refers to Flink's Checkpoint mechanism. In
my opinion, the most difficulty in stream's checkpoint is big window's backup. In  many applications,
the window may contain hundreds of GB data. We designed a mechanism using incremental backup
for window. That is a integrated window may be kept in several successive checkpointed states(every
checkpointed state keeps partial window data, records the boundary of the partial window and
the integrated window's boundary at that moment). When restoring the state, the thread needs
to scan several successive checkpointed states in order to recover the whole window according
to the boundaries of partial windows and whole window.
> 
> I think maybe this can be a candidate way of backuping window. What's more, I think window's
checkpoint should support user-defined state using Checkpointed Interface. In many applications,
users may calculate many import states in accumulated way cording to the history of stream.
Once lost, these states couldn't be recovered using current Flink(version 1.1.3) window checkpoint
mechanism.
> 
> Syinchwun Leo
> 
> -----邮件原件-----
> 发件人: Aljoscha Krettek [mailto:aljoscha@apache.org] 
> 发送时间: 2016年11月28日 18:58
> 收件人: dev@flink.apache.org
> 主题: Re: Window's Checkpoint problem
> 
> Hi,
> this is indeed a bug (though I would see it more as a feature since I think using the
Checkpointed interface there can indeed be problematic, as Till pointed out). The problem
is that the Scala Wrapper functions have to implement all kinds of interfaces so that they
can forward to the wrapped function. Or we would have to have a wrapper function for each
combination of interfaces that a user function can implement.
> 
> In the long run, our use of interfaces for user functions does not seem to scale well
in the Scala API.
> 
> Cheers,
> Aljoscha
> 
> On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrmann@apache.org> wrote:
> 
>> Hi 时某人,
>> 
>> I think you've found an inconsistency in Flink's windowing API (but 
>> it's the same in the Java API). Handling operator state in the context 
>> of windows is a little bit delicate because you could have multiple 
>> windows in flight, though. I've pulled Aljoscha in this thread who is 
>> more familiar with the windowing API and can give you probably a better explanation.
>> 
>> I think either we allow it or we check that a window function does not 
>> implement the Checkpointed interface and if it does, then notify the 
>> user about it. Furthermore, I think we should document these subtle 
>> behaviour differences better.
>> 
>> Cheers,
>> Till
>> 
>> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui666@163.com> wrote:
>> 
>>> Is there some State backend and checkpoint design architecture document?
>>> 
>>> 
>>> ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup.
>>> https://github.com/pusuo/streaming-resource/blob/master/flink-meetup
>>> -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%
>>> E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf
>>> 
>>> 
>>> Thanks
>>> 
>>> At 2016-11-26 10:30:52, "liuxinchun" <liuxinchun@huawei.com> wrote:
>>> 
>>> 
>>> Hi all:
>>> 
>>> 
>>> 
>>> I am paying attention to Flink, and encounter a problem about user
>> defined
>>> window with checkpoint. My code like this:
>>> 
>>> 
>>> 
>>> class WindowStatistics extends WindowFunction[Event, Int, Tuple, 
>>> TimeWindow] with Checkpointed[Option[Int]]: Unit = {
>>> 
>>> 
>>> 
>>>         private var count = 0
>>> 
>>> 
>>> 
>>> override def apply(key: Tuple, window: TimeWindow, input:
>> Iterator[Event],
>>> out: Collector[Int]): Unit = {
>>> 
>>>         count = XXXX
>>> 
>>>         XXXXXXXX
>>> 
>>>         out.collect(count)
>>> 
>>> }
>>> 
>>> override def snapshotState(checkpointId: Long, checkpointTimestamp:
>> Long):
>>> Option[Int] = {
>>> 
>>>          Some(count)
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Override def restoreState(state: Option[Int]): Unit = {
>>> 
>>>          state match {
>>> 
>>>                   case Some(c) => count = c
>>> 
>>>                   case None => count = 0
>>> 
>>>         }
>>> 
>>> }
>>> 
>>> }
>>> 
>>> and
>>> 
>>> env.enableCheckpointing(5000)
>>> 
>>> env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
>>> 
>>> when making checkpoint, my window only make checkpoint of data in 
>>> window(panes), but user defined state(count) is not contained in 
>>> checkpoint. When debugging, I found in
>>> 
>>> line 123 of AbstractUdfStreamOperator.java
>>> 
>>> if (userFunction instanceof Checkpointed) {
>>> 
>>>     XXXXXX
>>> 
>>> }
>>> 
>>> is false(other operators, like map, filter is true). And 
>>> userFunction is actually a ScalaWindowFunctionWrapper object.
>>> 
>>> So, my question is : Is it a bug? If not, what is the design 
>>> philosophy of window’s checkpoint? In many scenes, users may want to 
>>> checkpoint
>> their
>>> own defined states, but the design does not support seemingly. Or my
>> method
>>> of window’s checkpoint application is wrong?
>>> 
>>> Thank you!
>> 

Mime
View raw message