thanks Biao,

I see. To achieve what I want to do I need to work with KeyedStream. I downloaded the Flink source code to learn and alter the KeyedStream to my needs. I am not sure but it is a lot of work because as far as I understood the key-groups have to be predictable [1]. and altering this touches a lot of other parts of the source code.

However, If I guarantee that they (key-groups) are predictable, I will be able to rebalance, rescale, .... the keys to other worker-nodes.

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez


On Mon, Sep 23, 2019 at 9:51 AM Biao Liu <mmyy1110@gmail.com> wrote:
Hi Felipe,

Flink job graph is DAG based. It seems that you set an "edge property" (partitioner) several times. 
Flink does not support multiple partitioners on one edge. The later one overrides the priors. That means the "keyBy" overrides the "rebalance" and "partitionByPartial".

You could insert some nodes between these partitioners to satisfy your requirement. For example,
`sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <felipe.o.gutierrez@gmail.com> wrote:
I am executing a data stream application which uses rebalance. Basically I am counting words using "src -> split -> physicalPartitionStrategy -> keyBy -> sum -> print". I am running 3 examples, one without physical partition strategy, one with rebalance strategy [1], and one with partial partition strategy from [2]. 
I know that the keyBy operator actually kills what rebalance is doing because it splits the stream by key and if I have a stream with skewed key, one parallel instance of the operator after the keyBy will be overloaded. However, I was expecting that before the keyBy I would have a balanced stream, which is not happening. 

Basically, I want to see the difference in records/sec between operators when I use rebalance or any other physical partition strategy. However, when I found no difference in the records/sec metrics of all operators when I am running 3 different physical partition strategies. Screenshots of Prometheus+Grafana are attached.

Maybe I am measuring the wrong operator, or maybe I am not using the rebalance in the right way, or I am not doing a good use case to test the rebalance transformation.
I am also testing a different physical partition to later try to implement the issue "FLINK-1725 New Partitioner for better load balancing for skewed data" [2]. I am not sure, but I guess that all physical partition strategies have to be implemented on a KeyedStream.

DataStream<String> text = env.addSource(new WordSource());
// split lines in strings
DataStream<Tuple2<String, Integer>> tokenizer = text.flatMap(new Tokenizer());
// choose a partitioning strategy
DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer);
DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer.rebalance();
DataStream<Tuple2<String, Integer>> partitionedStream = tokenizer.partitionByPartial(0);
// count