flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biao Liu <mmyy1...@gmail.com>
Subject Re: What is the right way to use the physical partitioning strategy in Data Streams?
Date Mon, 23 Sep 2019 13:47:04 GMT
Wow, that's really cool! There are indeed a lot works you have done. IMO
it's beyond the scope of user group somewhat.

Just one small concern, I'm not sure I have fully understood your way of
"tackle data skew by altering the way Flink partition keys using
KeyedStream".

>From my understanding, key-group is used for rescaling job. Like supporting
reusing state after changing the parallelism of operator.
I'm not sure whether you are in the right direction or not. It seems that
you are implementing something deeper than user interface. User interface
is stable, while implementation is not. Usually it's not recommended to
support a feature based on implementation.

If you have strong reasons to change the implementation, I would suggest to
start a discussion in dev mailing list. Maybe it could be supported
officially. What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez <felipe.o.gutierrez@gmail.com>
wrote:

>
> I`ve implemented a combiner [1] in Flink by extending
> OneInputStreamOperator in Flink. I call my operator using "transform".
> It works well and I guess it is useful if I import this operator in the
> DataStream.java. I just need more to check if I need to touch other parts
> of the source code.
>
> But now I want to tackle data skew by altering the way Flink partition
> keys using KeyedStream.
>
> [1]
> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu <mmyy1110@gmail.com> wrote:
>
>> Hi Felipe,
>>
>> If I understand correctly, you want to solve data skew caused by
>> imbalanced key?
>>
>> There is a common strategy to solve this kind of problem,
>> pre-aggregation. Like combiner of MapReduce.
>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>> afraid you have to implement it by yourself.
>>
>> For example, introducing a function caching some data (time or count
>> based). This function should be before "keyby". And it's on a non-keyed
>> stream. It does pre-aggregation just like what the aggregation after
>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>
>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>
>> Hope it helps.
>>
>> 1.
>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> thanks Biao,
>>>
>>> I see. To achieve what I want to do I need to work with KeyedStream. I
>>> downloaded the Flink source code to learn and alter the KeyedStream to my
>>> needs. I am not sure but it is a lot of work because as far as I understood
>>> the key-groups have to be predictable [1]. and altering this touches a lot
>>> of other parts of the source code.
>>>
>>> However, If I guarantee that they (key-groups) are predictable, I will
>>> be able to rebalance, rescale, .... the keys to other worker-nodes.
>>>
>>> [1]
>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>>
>>> Thanks,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1110@gmail.com> wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>>> (partitioner) several times.
>>>> Flink does not support multiple partitioners on one edge. The later one
>>>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>>>> "partitionByPartial".
>>>>
>>>> You could insert some nodes between these partitioners to satisfy your
>>>> requirement. For example,
>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> I am executing a data stream application which uses rebalance.
>>>>> Basically I am counting words using "src -> split ->
>>>>> physicalPartitionStrategy -> keyBy -> sum -> print". I am running
3
>>>>> examples, one without physical partition strategy, one with rebalance
>>>>> strategy [1], and one with partial partition strategy from [2].
>>>>> I know that the keyBy operator actually kills what rebalance is doing
>>>>> because it splits the stream by key and if I have a stream with skewed
key,
>>>>> one parallel instance of the operator after the keyBy will be overloaded.
>>>>> However, I was expecting that *before the keyBy* I would have a
>>>>> balanced stream, which is not happening.
>>>>>
>>>>> Basically, I want to see the difference in records/sec between
>>>>> operators when I use rebalance or any other physical partition strategy.
>>>>> However, when I found no difference in the records/sec metrics of all
>>>>> operators when I am running 3 different physical partition strategies.
>>>>> Screenshots of Prometheus+Grafana are attached.
>>>>>
>>>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>>>> rebalance in the right way, or I am not doing a good use case to test
the
>>>>> rebalance transformation.
>>>>> I am also testing a different physical partition to later try to
>>>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>>>> partition strategies have to be implemented on a KeyedStream.
>>>>>
>>>>> DataStream<String> text = env.addSource(new WordSource());
>>>>> // split lines in strings
>>>>> DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new
>>>>> Tokenizer());
>>>>> // choose a partitioning strategy
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>> tokenizer.rebalance();
>>>>> DataStream<Tuple2<String, Integer>> partitionedStream =
>>>>> tokenizer.partitionByPartial(0);
>>>>> // count
>>>>> partitionedStream.keyBy(0).sum(1).print();
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>>>
>>>>> thanks,
>>>>> Felipe
>>>>>
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>

Mime
View raw message