flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: HBase reads and back pressure
Date Thu, 09 Jun 2016 14:38:43 GMT
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before
the window operator?

In any case, I think you can improve your WindowFunction if you convert
parts of it into a FoldFunction<ANA, SummaryStatistics>.
The FoldFunction would take care of the statistics computation and the
WindowFunction would only assemble the result record including extracting
the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new

This is more efficient because the FoldFunction is eagerly applied when
ever a new element is added to a window. Hence, the window does only hold a
single value (SummaryStatistics) instead of all element added to the
window. In contrast the WindowFunction is called when the window is finally

Hope this helps,

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <

> Hi,
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time close
> to 1.
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
> then it get worse and worse. Even when I cancel the job, data are still
> being written in HBase (I did a sink similar to the example - with a cache
> of 100s of HBase Puts to be a bit more efficient).
> When I don't put a sink it seems to stay on 1M reads/s.
> Do you have an idea why ?
> Here is a bit of code if needed:
> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
> final SingleOutputStreamOperator<Put> puts = ws.apply(new
> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable<ANA> input,
> final Collector<Put> out) throws Exception {
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
> Thanks for any feedback!
> Christophe

View raw message