flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dominik Bruhn <domi...@dbruhn.de>
Subject Set Parallelism and keyBy
Date Mon, 26 Dec 2016 18:52:34 GMT
Hey,
I have a flink job which has a default parallelism set to 2. I want to 
key the stream and then apply some flatMap on the keyed stream. The 
flatMap operation is quiet costly, so I want to have a much higher 
parallelism here (lets say 16). Additionally, it is important that the 
flatMap operation is executed for the same key always in the same 
process or in the same task.

I have the following code:

----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
----

This works fine, and the "ExpensiveOperation" is executed always on the 
same tasks for the same keys.

Now I tried two things:

1.
----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).setParallelism(16).flatMap(new 
ExpensiveOperation()).print()
----
This fails with an exception because I can't set the parallelism on the 
keyBy operator.

2.
-----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new 
ExpensiveOperation()).setParallelism(16).print()
-----
While this executes, it breaks the assignment of the keys to the tasks: 
The "ExpensiveOperation" is now not executed on the same nodes anymore 
all the time (visible by the prefixes in the print()).

What am I doing wrong? Is the only chance to set the whole parallelism 
of the whole flink job to 16?

Thanks, have nice holidays,
Dominik

Mime
View raw message