flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Hermann <m...@gaborhermann.com>
Subject Re: [DISCUSS] Per-key event time
Date Thu, 23 Feb 2017 10:14:03 GMT
Hey all,

Let me share some ideas about this.

@Paris: The local-only progress tracking indeed seems easier, we do not 
need to broadcast anything. Implementation-wise it is easier, but 
performance-wise probably not. If one key can come from multiple 
sources, there could be a lot more network overhead with per-key 
tracking then broadcasting, somewhat paradoxically. Say source instance 
S1 sends messages and watermarks to operator instances O1, O2. In the 
broadcasting case, S1 would send one message to O1 and one to O2 per 
watermark (of course it depends on how fast the watermarks arrive), 
total of 2. Although, if we keep track of per-key watermarks, S1 would 
need to send watermarks for every key directed to O1, also for O2. So if 
10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks 
arrive at the same rate per-key as per-source in the previous case) we 
S1 would send a total of 20 watermarks.

Another question is whether how large the state-per-key is? If it's 
really small (an integer maybe, or state of a small state machine), then 
the overhead of keeping track of a (Long) watermark is large 
memory-wise. E.g. Int state vs. Long watermark results in 3x as large 
state. Also, the checkpointing would be ~3x as slow. Of course, for 
large states a Long watermark would not mean much overhead.

We could resolve the memory issue by using some kind of sketch data 
structure. Right now the granularity of watermark handling is 
per-operator-instance. On the other hand, per-key granularity might be 
costly. What if we increased the granularity of watermarks inside an 
operator by keeping more than one watermark tracker in one operator? 
This could be quite simply done with a hash table. With a hash table of 
size 1, we would yield the current semantics (per-operator-instance 
granularity). With a hash table large enough to have at most one key per 
bucket, we would yield per-key watermark tracking. In between lies the 
trade-off between handling time-skew and a lot of memory overhead. This 
does not seem hard to implement.

Of course, at some point we would still need to take care of watermarks 
per-key. Imagine that keys A and B would go to the same bucket of the 
hash table, and watermarks are coming in like this: (B,20), (A,10), 
(A,15), (A,40). Then the watermark of the bucket should be the minimum 
as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of 
the watermarks of A and B separately. But after we have a correct 
watermark for the bucket, all we need to care about is the bucket 
watermarks. So somewhere (most probably at the source) we would have to 
pay memory overhead of tracking every key, but nowhere else in the topology.

Regarding the potentially large network overhead, the same compression 
could be useful. I.e. we would not send watermarks from one operator 
per-key, but rather per-hash. Again, the trade-off between time skew and 
memory consumption is configurable by the size of the hash table used.

Cheers,
Gabor

On 2017-02-23 08:57, Paris Carbone wrote:

> Hey Jamie!
>
> Key-based progress tracking sounds like local-only progress tracking to me, there is
no need to use a low watermarking mechanism at all since all streams of a key are handled
by a single partition at a time (per operator).
> Thus, this could be much easier to implement and support (i.e., no need to broadcast
the progress state of each partition all the time).
> State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState
in the future.
>
> Just my quick thoughts on this, to get the discussion going :)
>
> cheers
> Paris
>
>> On 23 Feb 2017, at 01:01, Jamie Grier <jamie@data-artisans.com> wrote:
>>
>> Hi Flink Devs,
>>
>> Use cases that I see quite frequently in the real world would benefit from
>> a different watermarking / event time model than the one currently
>> implemented in Flink.
>>
>> I would call Flink's current approach partition-based watermarking or maybe
>> subtask-based watermarking.  In this model the current "event time" is a
>> property local to each subtask instance in a dataflow graph.  The event
>> time at any subtask is the minimum of the watermarks it has received on
>> each of it's input streams.
>>
>> There are a couple of issues with this model that are not optimal for some
>> (maybe many) use cases.
>>
>> 1) A single slow subtask (or say source partition) anywhere in the dataflow
>> can mean no progress can be made on the computation at all.
>>
>> 2) In many real world scenarios the time skew across keys can be *many*
>> times greater than the time skew within the data with the same key.
>>
>> In this discussion I'll use "time skew" to refer to the out-of-orderness
>> with respect to timestamp of the data.  Out-of-orderness is a mouthful ;)
>>
>> Anyway, let me provide an example or two.
>>
>> In IoT applications the source of events is a particular device out in the
>> world, let's say a device in a connected car application.  The data for
>> some particular device may be very bursty and we will certainly get events
>> from these devices in Flink out-of-order just because of things like
>> partitions in Kafka, shuffles in Flink, etc.  However, the time skew in the
>> data for a single device should likely be very small (milliseconds or maybe
>> seconds)..
>>
>> However, in the same application the time skew across different devices can
>> be huge (hours or even days).  An obvious example of this, again using
>> connected cars as a representative example is the following:  Car A is
>> recording data locally at 12:00 pm on Saturday but doesn't currently have a
>> network connection.  Car B is doing the same thing but does have a network
>> connection.  Car A will transmit it's data when the network comes back on
>> line.  Let's say this is at 4pm.  Car B was transmitting it's data
>> immediately.  This creates a huge time skew (4 hours) in the observed
>> datastream when looked at as a whole.  However, the time skew in that data
>> for Car A or Car B alone could be tiny.  It will be out of order of course
>> but maybe by only milliseconds or seconds.
>>
>> What the above means in the end for Flink is that the watermarks must be
>> delayed by up to 4 hours or more because we're looking at the data stream
>> as a whole -- otherwise the data for Car A will be considered late.  The
>> time skew in the data stream when looked at as a whole is large even though
>> the time skew for any key may be tiny.
>>
>> This is the problem I would like to see a solution for.  The basic idea of
>> keeping track of watermarks and event time "per-key" rather than per
>> partition or subtask would solve I think both of these problems stated
>> above and both of these are real issues for production applications.
>>
>> The obvious downside of trying to do this per-key is that the amount of
>> state you have to track is much larger and potentially unbounded.  However,
>> I could see this approach working if the keyspace isn't growing rapidly but
>> is stable or grows slowly.  The saving grace here is that this may actually
>> be true of the types of applications where this would be especially
>> useful.  Think IoT use cases.  Another approach to keeping state size in
>> check would be a configurable TTL for a key.
>>
>> Anyway, I'm throwing this out here on the mailing list in case anyone is
>> interested in this discussion, has thought about the problem deeply
>> already, has use cases of their own they've run into or has ideas for a
>> solution to this problem.
>>
>> Thanks for reading..
>>
>> -Jamie
>>
>>
>> -- 
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> jamie@data-artisans.com
>


Mime
View raw message