flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biao Liu <mmyy1...@gmail.com>
Subject Re: What is the right way to use the physical partitioning strategy in Data Streams?
Date Mon, 23 Sep 2019 12:37:11 GMT
Hi Felipe,

If I understand correctly, you want to solve data skew caused by imbalanced
key?

There is a common strategy to solve this kind of problem, pre-aggregation.
Like combiner of MapReduce.
But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
afraid you have to implement it by yourself.

For example, introducing a function caching some data (time or count
based). This function should be before "keyby". And it's on a non-keyed
stream. It does pre-aggregation just like what the aggregation after
"keyby" does. In this way, the skewed keyed data would be reduced a lot.

I also found a suggestion [1] from Fabian, although it's long time ago.

Hope it helps.

1.
https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <felipe.o.gutierrez@gmail.com>
wrote:

> thanks Biao,
>
> I see. To achieve what I want to do I need to work with KeyedStream. I
> downloaded the Flink source code to learn and alter the KeyedStream to my
> needs. I am not sure but it is a lot of work because as far as I understood
> the key-groups have to be predictable [1]. and altering this touches a lot
> of other parts of the source code.
>
> However, If I guarantee that they (key-groups) are predictable, I will be
> able to rebalance, rescale, .... the keys to other worker-nodes.
>
> [1]
> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1110@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> Flink job graph is DAG based. It seems that you set an "edge property"
>> (partitioner) several times.
>> Flink does not support multiple partitioners on one edge. The later one
>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>> "partitionByPartial".
>>
>> You could insert some nodes between these partitioners to satisfy your
>> requirement. For example,
>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> I am executing a data stream application which uses rebalance. Basically
>>> I am counting words using "src -> split -> physicalPartitionStrategy ->
>>> keyBy -> sum -> print". I am running 3 examples, one without physical
>>> partition strategy, one with rebalance strategy [1], and one with
>>> partial partition strategy from [2].
>>> I know that the keyBy operator actually kills what rebalance is doing
>>> because it splits the stream by key and if I have a stream with skewed key,
>>> one parallel instance of the operator after the keyBy will be overloaded.
>>> However, I was expecting that *before the keyBy* I would have a
>>> balanced stream, which is not happening.
>>>
>>> Basically, I want to see the difference in records/sec between operators
>>> when I use rebalance or any other physical partition strategy. However,
>>> when I found no difference in the records/sec metrics of all operators when
>>> I am running 3 different physical partition strategies. Screenshots of
>>> Prometheus+Grafana are attached.
>>>
>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>> rebalance in the right way, or I am not doing a good use case to test the
>>> rebalance transformation.
>>> I am also testing a different physical partition to later try to
>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>> partition strategies have to be implemented on a KeyedStream.
>>>
>>> DataStream<String> text = env.addSource(new WordSource());
>>> // split lines in strings
>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>> Tokenizer());
>>> // choose a partitioning strategy
>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>> tokenizer.rebalance();
>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>> tokenizer.partitionByPartial(0);
>>> // count
>>> partitionedStream.keyBy(0).sum(1).print();
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>
>>> thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>

Mime
View raw message