flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Fault tolerance & idempotency on window functions
Date Thu, 04 May 2017 08:13:07 GMT
When keying, keep in mind that Kafka and Flink might use a different scheme for hashing. For
example, Flink also applies a murmur hash on the hash code retrieved from the key and then
has some internal logic for assigning that hash to a key group (the internal unit of key partitioning).
I don’t know what Kafka does internally for hashing.

Also keep in mind that even with event time, the events are not ordered by event time. So
the event that arrives first does not necessarily have the lowest timestamp. Using event-time
just means that we wait for the watermark to trigger window computation.

Regarding state size, if you don’t use merging windows (for example, session windows) then
the only state that is kept for a purged window is a cleanup timer that is set for “end
of window + allowed lateness”. That is, the state size does not increase with increasing
allowed lateness if you purge. This could still fit into the heap state backend and you don’t
necessarily need to consider RocksDB.

> On 29. Apr 2017, at 10:19, Kamil Dziublinski <kamil.dziublinski@gmail.com> wrote:
> Big thanks for replying Aljoscha, I spend quite some time on thinking how to solve this
problem and came to some conclusions. Would be cool if you can verify if my logic is correct.
> I decided that if I will partition data in kafka in the same way as I partition my window
with keyby. It's tenant, user combination (I would still use hash out of it in kafka producer)
and I will switch processing to event time (currently it was processing time) then during
replay I could be 100% sure that first element will always be first, and watermark for triggering
the window would also come at the same moment. This giving me idempotent writes of this batched
object to HBase.
> And for late events (by configuring lateness on the window itself) I would configure
the trigger to fire & purge, so that it doesn't hold fired data. This way if late event
arrives I could fire this late event with a different timestamp treating it in hbase as totally
separate increment, not overriding my previous data. 
> The reason I want to purge data here on firing, is cause I would need to have allowed
lateness on window of at least 2 months. So holding all data after firing for 2 months would
be too costly.
> Additional question here, is there any cost to having allowed lateness very high (like
2 months) if we configure trigger to fire & purge. Like any additional state or metadata
that flinks need to maintain that would take much memory from the cluster? Would I have to
consider rocksdb here for state or FS state could still work?
> On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
> Hi,
> Yes, your analysis is correct: Flink will not retry for individual elements but will
restore from the latest consistent checkpoint in case of failure. This also means that you
can get different window results based on which element arrives first, i.e. you have a different
timestamp on your output in that case.
> One simple mitigation for the timestamp problem is to use the largest timestamp of elements
within a window instead of the first timestamp. This will be stable across restores even if
the order of arrival of elements changes. You can still get problems when it comes to late
data and window triggering, if you cannot guarantee that your watermark is 100 % correct,
though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives
late that was not considered when doing the first processing that failed.
> Best,
> Aljoscha
> > On 25. Apr 2017, at 19:54, Kamil Dziublinski <kamil.dziublinski@gmail.com <mailto:kamil.dziublinski@gmail.com>>
> >
> > Hi guys,
> >
> > I have a flink streaming job that reads from kafka, creates some statistics increments
and stores this in hbase (using normal puts).
> > I'm using fold function here of with window of few seconds.
> >
> > My tests showed me that restoring state with window functions is not exactly working
how I expected.
> > I thought that if my window functions emits an aggregated object to a sink, and
that object fails in a sink, this write to hbase will be replayed. So even if it actually
got written to HBase, but flink thought it didnt (for instance during network problem) I could
be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first
event used in that window for aggregation.
> >
> > Now correct me if I'm wrong but it seems that in the case of failure (even if its
in sink) whole flow is getting replayed from last checkpoint which means that my window function
might evict aggregated object in a different form. For instance not only having tuples that
failed but also other ones, which would break my idempotency her and I might end up with having
higher counters than I should have.
> >
> > Do you have any suggestion on how to solve/workaround such problem in flink?
> >
> > Thanks,
> > Kamil.
> >
> >

View raw message