flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: large sliding window perf question
Date Wed, 24 May 2017 15:24:13 GMT
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take
a look here because he knows best about the expected scalability of the sliding window implementation.
 
> Am 24.05.2017 um 16:49 schrieb Carst Tankink <ctankink@bol.com>:
> 
> Hi,
>  
> Thanks Aljoshcha!
> To complete my understanding: the problem here is that each element in the sliding window(s)
basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up
:-) 
> I have a good idea on how to proceed next, so I will be trying out writing the custom
ProcessFunction next (week).
>  
> Stefan, in our case we are already on Flink 1.2.1 which should have the patched version
of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a
Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but
did not solve this case, which fits the “way too much RocksDB access” explanation better.
>  
>  
> Thanks again,
> Carst
>  
> From: Aljoscha Krettek <aljoscha@apache.org>
> Date: Wednesday, May 24, 2017 at 16:13
> To: Stefan Richter <s.richter@data-artisans.com>
> Cc: Carst Tankink <ctankink@bol.com>, "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: large sliding window perf question
>  
> Hi, 
>  
> I’m afraid you’re running into a general shortcoming of the current sliding windows
implementation: every sliding window is treated as its own window that has window contents
and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1
minute slide this means each element is in 240 windows and you basically amplify writing to
RocksDB by 240. This gets out of hand very quickly with larger differences between window
side and slide interval.
>  
> I’m also afraid there is no solution for this right now so the workaround Chen mentioned
is the way to go right now.
>  
> Best,
> Aljoscha
> On 24. May 2017, at 14:07, Stefan Richter <s.richter@data-artisans.com <mailto:s.richter@data-artisans.com>>
wrote:
>  
> Hi, 
>  
> both issues sound like the known problem with RocksDB merging state. Please take a look
here
>  
> https://issues.apache.org/jira/browse/FLINK-5756 <https://issues.apache.org/jira/browse/FLINK-5756>
>  
> and here
>  
> https://github.com/facebook/rocksdb/issues/1988 <https://github.com/facebook/rocksdb/issues/1988>
>  
> Best,
> Stefan
>  
>  
> Am 24.05.2017 um 14:33 schrieb Carst Tankink <ctankink@bol.com <mailto:ctankink@bol.com>>:
>  
> Hi,
>  
> We are seeing a similar behaviour for large sliding windows. Let me put some details
here and see if they match up enough with Chen’s:
>  
> Technical specs:
> -          Flink 1.2.1 on YARN
> -          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM
since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
>  
> Pipeline:
> -          Read from Kafka, extract ids
> -          KeyBy id,  count occurences of each id using a fold. The window size of this
operator is 10 minutes with a slide of 1 minute
> -          KeyBy id (again),  compute mean, standard deviation using a fold. The window
size of this operator is 4 hours with a slide of 1 minute.
> -          Post-process data, sink.
>  
> What I observe is:
> -          With a heap-based backend, the job runs really quick  (couple of minutes to
process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
> -          With the RocksDB backend, checkpoints get stuck most of the time, and the
“count occurences” step gets a lot of back pressure from the next operator (on the large
window)
> o    In those cases the checkpoint does succeed, the state for the large window is around
500-700MB, others states are within the KBs.
> o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for
a single subtask of the count operator, with the other operators aligning within milliseconds.
The checkpoint duration itself is no more than 2seconds even for the larger states.
>  
>  
> At this point, I’m a bit at a loss to figure out what’s going on. My best guess is
it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond
me.
>  
> Hope this info helps in figuring out what is going on, and hopefully it is actually related
to Chen’s case :)
>  
>  
> Thanks,
> Carst
>  
> From: Stefan Richter <s.richter@data-artisans.com <mailto:s.richter@data-artisans.com>>
> Date: Tuesday, May 23, 2017 at 21:35
> To: "user@flink.apache.org <mailto:user@flink.apache.org>" <user@flink.apache.org
<mailto:user@flink.apache.org>>
> Subject: Re: large sliding window perf question
>  
> Hi,
>  
> Which state backend and Flink version are you using? There was a problem with large merging
states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide
a custom patch for this with all newer versions of Flink. 
>  
> Best,
> Stefan
>  
> Am 23.05.2017 um 21:24 schrieb Chen Qin <qinnchen@gmail.com <mailto:qinnchen@gmail.com>>:
>  
> Hi there,
>  
> I have seen some weird perf issue while running event time based job with large sliding
window (24 hours offset every 10s) 
>  
> pipeline looks simple, 
> tail kafka topic and assign timestamp and watermark, forward to large sliding window
(30days) and fire every 10 seconds and print out.
>  
> what I have seen first hand was checkpointing stuck, took longer than timeout despite
traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window
operator consumes message really slowly and throttle sources.
>  
> I also tried to limit window time to mins and all issues are gone.
>  
> Any suggestion on this. My work around is I implemented processFunction and keep big
value state, periodically evaluate and emit downstream (emulate what sliding window does)
>  
> Thanks,
> Chen
>  
>  
> 
> 
> 
>  
>  


Mime
View raw message