flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators
Date Mon, 23 Jan 2017 12:25:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834377#comment-15834377
] 

ASF GitHub Bot commented on FLINK-5473:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97282598
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
---
    @@ -57,47 +57,51 @@
     
     	/** Use the same log for all ExecutionGraph classes */
     	private static final Logger LOG = ExecutionGraph.LOG;
    -	
    -	private final SerializableObject stateMonitor = new SerializableObject();
    +
    +	public static final int VALUE_NOT_SET = -1;
    +
    +	private final Object stateMonitor = new Object();
     	
     	private final ExecutionGraph graph;
     	
     	private final JobVertex jobVertex;
     	
     	private final ExecutionVertex[] taskVertices;
     
    -	private IntermediateResult[] producedDataSets;
    +	private final IntermediateResult[] producedDataSets;
     	
     	private final List<IntermediateResult> inputs;
     	
     	private final int parallelism;
     
    -	private final int maxParallelism;
    -	
     	private final boolean[] finishedSubtasks;
    -			
    -	private volatile int numSubtasksInFinalState;
    -	
    +
     	private final SlotSharingGroup slotSharingGroup;
    -	
    +
     	private final CoLocationGroup coLocationGroup;
    -	
    +
     	private final InputSplit[] inputSplits;
     
    +	private final int maxParallelismConfigured;
    +
    +	private int maxParallelismDerived;
    +
    +	private volatile int numSubtasksInFinalState;
    +
     	/**
     	 * Serialized task information which is for all sub tasks the same. Thus, it avoids
to
     	 * serialize the same information multiple times in order to create the
     	 * TaskDeploymentDescriptors.
     	 */
    -	private final SerializedValue<TaskInformation> serializedTaskInformation;
    +	private SerializedValue<TaskInformation> serializedTaskInformation;
     
     	private InputSplitAssigner splitAssigner;
     	
     	public ExecutionJobVertex(
     		ExecutionGraph graph,
     		JobVertex jobVertex,
     		int defaultParallelism,
    -		Time timeout) throws JobException, IOException {
    +		Time timeout) throws JobException {
    --- End diff --
    
    Method declaration parameters which are broken into multiple lines are usually indented
twice.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> -----------------------------------------------------------------------
>
>                 Key: FLINK-5473
>                 URL: https://issues.apache.org/jira/browse/FLINK-5473
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: Robert Metzger
>            Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism higher than
1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it will be set
as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to the parallelism
of the 1.1 job. Non-parallel operators will then also get the maxPar set to this value, leading
to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the maxParallelism
to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message