flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Gévay <gga...@gmail.com>
Subject Re: Flink strange stream join behavior
Date Sun, 16 Oct 2016 09:27:06 GMT

For your first question:

> the number of tuples are same in both cases

I guess you mean the total number of tuples here, right? So this means
that you have fewer, but larger windows. Suppose that you have W
windows, each with S tuples. Then your total input has W * S tuples,
and your total output has W * S * S tuples (assuming that all pairs of
tuples from the two matching windows are matched in the join, since
then the join of the matching windows squares the number of tuples in
the window).

Now if you multiply the window size by r, but also divide the number
of windows by r (so that the total number of input tuples stays the
same), then you have W/r * S*r * S*r tuples, which simplified is
W*S*S*r, so the total number of output tuples gets multiplied by r,
and this is indeed close to the numbers that you reported: In your
second case, you have about 40 times as much data in a window (I
rounded your 37), and 40 times the output. (r = 40)

Another way to imagine the situation is that when you chunk your data
to larger windows, then more tuples will "meet" in the joins, since
only those tuples are matched in the join that are in the same

I hope this helps, and I didn't misunderstand your situation.


2016-10-15 23:28 GMT+02:00 Davood Rafiei <rafieidavood9@gmail.com>:
> Hi,
> I am experiencing strange flink stream windowed join behavior.
> I want to do windowed (processing time) join between two partitioned
> streams. I read data from socket.
>  I have two cases: 1. data speed in socket is relatively slow (say 1K ps) 2.
> data speed in socket is high (say 37K).
> The number of tuples read from socket is same in both cases to both cases.
> Firstly, the size of output (of join operation) is much higher in case 2
> although the number of tuples are same in both cases. For example, in
> case-1, the overall output size is 500M and in case 2 it is 20G. I couldn't
> get the logic behind this.
> Secondly, in both cases, flink ingests all data from socket (more or less)
> as soon as it is available. So, it has high throughput. However, especially
> in case 2, I have to wait long time after data is ingested from source
> operator. So, the data from socket is acquired and socket gets idle, and
> then I have to wait long time to get actual output to sink. My question is
> that, if this behavior is normal, and all the data acquired stays somewhere
> inside flink, why backpressure is not applied to source operators? I mean if
> the system cannot compute all inputs with high speed, then it should lower
> the reading speed from socket.
> Thanks
> Davood

View raw message