flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
Date Tue, 31 Oct 2017 04:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16226215#comment-16226215
] 

ASF GitHub Bot commented on FLINK-7153:
---------------------------------------

Github user summerleafs commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r147891494
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be
completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint
locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures
= getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    --- End diff --
    
    Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't yet support
reading the checkpoint data locally? I have create a issue for flink reading checkpoint locally
[here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it complete i wonder
if we can invoke `getPreferedLocations()` instead of `getPreferredLocationsBasedOnInputs()`.


> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Till Rohrmann
>            Priority: Critical
>             Fix For: 1.4.0, 1.3.3
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex one by one
via calling ExecutionJobVertex.allocateResourcesForAll(), here is two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return empty, cause
`sourceSlot` always be null until `ExectionVertex` has been deployed via 'Execution.deployToSlot()'.
So allocate resource base on prefered location can't work correctly, we need to set the slot
info for `Execution` as soon as Execution.allocateSlotForExecution() called successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three remote partition.
But actually, it should be 2 local partition and 2 remote partition. 
> The causes of the above problems is becuase that the current allocate strategy is allocate
the resource for execution one by one(if the execution can allocate from SlotGroup than get
it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below is the
Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which only allocate
resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message