flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stefan Richter (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-5052) Changing the maximum parallelism (number of key groups) of a job
Date Fri, 11 Nov 2016 10:12:58 GMT
Stefan Richter created FLINK-5052:

             Summary: Changing the maximum parallelism (number of key groups) of a job
                 Key: FLINK-5052
                 URL: https://issues.apache.org/jira/browse/FLINK-5052
             Project: Flink
          Issue Type: Improvement
          Components: State Backends, Checkpointing
            Reporter: Stefan Richter

Through dynamic rescaling, Flink jobs can already adjust their parallelism and each operator
only has to read it's assigned key-groups. 

However, the maximum parallelism is determined by the number of key-groups  (aka maxParallelism),
which is currently fixed forever after the job is first started. We could consider to relax
this limitations, so that users can modify the number of key-groups after the fact, which
is useful in particular for upscaling jobs from older Flink versions (<1.2) which must
be converted with maxparallelism == parallelism.

In the general case, changing the maxParallelism can lead to shuffling of keys between key-groups,
which means that a change in the number of key-groups can shuffle keys between key-groups
and we would have to read the complete state on each operator instance, filtering for those
keys that actually fall into the key-groups assigned to the operator instances. While it is
certainly possible to support this, it is obviously a very expensive operation.

Fortunately, the assignment of keys to operators is currently determined as follows:

{{operatorInstance = computeKeyGroup(key) * parallelism / maxParallelism}}

This means that we can provide more efficient support for upscaling of maxParallelism, if
{{newMaxParallelism == n * oldMaxParallelism}}. In this case, keys are not reshuffled between
key-groups, but key-groups are split by a factor n instead. This only focus on some old key-groups
when restoring operator instances for new maxParallelism and significantly reduces the amount
of unnecessary data transfer, e.g. ~ 1/2 for increasing maxParallelism by a factor 2, ~2/3
when increasing by a factor 3, etc. 

Implementing this feature would require the following steps:
	- Introduce/modify state handles with the capability to summarize multiple logical keygroups
into one mixed physical entity.
	- Enhance StateAssignmentOperation so that it can deal with and correctly assign the new/modified
keyed state handles to subtasks on restoring a checkpoint. We also need to implement how to
compute the correct super-key-group, but this is rather simple.
	- Filtering out key clippings on restoring in the backends.

This message was sent by Atlassian JIRA

View raw message