flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: HBase reads and back pressure
Date Mon, 13 Jun 2016 10:25:28 GMT
Hi Christophe,

A fold function has two inputs: The state and a record to update the
state with. So you can update the SummaryStatistics (state) with each
Put (input).

Cheers,
Max

On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
<christophe.salperwyck@gmail.com> wrote:
> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction<SummaryStatistics, SummaryStatistics, Tuple, TimeWindow>()
>
> I would like to return a HBase Put and not a SummaryStatistics. So something
> like this:
> WindowFunction<SummaryStatistics, Put, Tuple, TimeWindow>()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>
>> OK, this indicates that the operator following the source is a bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of the
>> WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> <christophe.salperwyck@gmail.com>:
>>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>>
>>>> Hi Christophe,
>>>>
>>>> where does the backpressure appear? In front of the sink operator or
>>>> before the window operator?
>>>>
>>>> In any case, I think you can improve your WindowFunction if you convert
>>>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
>>>> The FoldFunction would take care of the statistics computation and the
>>>> WindowFunction would only assemble the result record including extracting
>>>> the start time of the window.
>>>>
>>>> Then you could do:
>>>>
>>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>>> YourWindowFunction());
>>>>
>>>> This is more efficient because the FoldFunction is eagerly applied when
>>>> ever a new element is added to a window. Hence, the window does only hold
a
>>>> single value (SummaryStatistics) instead of all element added to the window.
>>>> In contrast the WindowFunction is called when the window is finally
>>>> evaluated.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>>>> <christophe.salperwyck@gmail.com>:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a program to read timeseries from HBase and do some daily
>>>>> aggregations (Flink streaming). For now I am just computing some average
so
>>>>> not very consuming but my HBase read get slower and slower (I have few
>>>>> billions of points to read). The back pressure is almost all the time
close
>>>>> to 1.
>>>>>
>>>>> I use custom timestamp:
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>
>>>>> so I implemented a custom extractor based on:
>>>>> AscendingTimestampExtractor
>>>>>
>>>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>>>> read/s then it get worse and worse. Even when I cancel the job, data
are
>>>>> still being written in HBase (I did a sink similar to the example - with
a
>>>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>>>
>>>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>>>
>>>>> Do you have an idea why ?
>>>>>
>>>>> Here is a bit of code if needed:
>>>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>>>> .keyBy(0)
>>>>> .timeWindow(Time.days(1));
>>>>>
>>>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>>>
>>>>> @Override
>>>>> public void apply(final Tuple key, final TimeWindow window, final
>>>>> Iterable<ANA> input,
>>>>> final Collector<Put> out) throws Exception {
>>>>>
>>>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>>>> for (final ANA ana : input) {
>>>>> summaryStatistics.addValue(ana.getValue());
>>>>> }
>>>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>>>> summaryStatistics);
>>>>> out.collect(put);
>>>>> }
>>>>> });
>>>>>
>>>>> And how I started Flink on YARN :
>>>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>>>
>>>>> Thanks for any feedback!
>>>>>
>>>>> Christophe
>>>>
>>>>
>>>
>>
>

Mime
View raw message