Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E5A6A200C10 for ; Fri, 3 Feb 2017 13:37:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E12FC160B48; Fri, 3 Feb 2017 12:37:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8B70B160B62 for ; Fri, 3 Feb 2017 13:37:33 +0100 (CET) Received: (qmail 57079 invoked by uid 500); 3 Feb 2017 12:37:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 56860 invoked by uid 99); 3 Feb 2017 12:37:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Feb 2017 12:37:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 605BBE0A6B; Fri, 3 Feb 2017 12:37:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 03 Feb 2017 12:37:31 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] flink git commit: [FLINK-5499] [JobManager] Make the location preferences combined by state and inputs. archived-at: Fri, 03 Feb 2017 12:37:35 -0000 [FLINK-5499] [JobManager] Make the location preferences combined by state and inputs. Reusing the prior location (for state locality) takes precedence over input locality. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9ed4ff1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9ed4ff1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9ed4ff1 Branch: refs/heads/master Commit: b9ed4ff151c5d3a64be395c660160b5619e32c7f Parents: fe4fe58 Author: Stephan Ewen Authored: Tue Jan 31 20:34:33 2017 +0100 Committer: Stephan Ewen Committed: Fri Feb 3 12:46:14 2017 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 83 +++++-- .../apache/flink/runtime/instance/SlotPool.java | 6 +- .../instance/SlotSharingGroupAssignment.java | 6 +- .../runtime/jobmanager/scheduler/Scheduler.java | 4 +- .../runtime/jobmanager/slots/AllocatedSlot.java | 2 +- .../ExecutionVertexLocalityTest.java | 244 +++++++++++++++++++ .../scheduler/SchedulerTestUtils.java | 6 +- 7 files changed, 323 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 0bb3514..cb2e177 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import java.io.IOException; @@ -264,20 +265,21 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable getPriorAssignedResourceLocations() { - List list = new ArrayList<>(); - for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) { - Execution prior = priorExecutions.get(i) ; - if (prior.getAssignedResourceLocation() != null) { - list.add(prior.getAssignedResourceLocation()); - break; + public TaskManagerLocation getLatestPriorLocation() { + synchronized (priorExecutions) { + final int size = priorExecutions.size(); + if (size > 0) { + return priorExecutions.get(size - 1).getAssignedResourceLocation(); + } + else { + return null; } } - return list; } EvictingBoundedList getCopyOfPriorExecutionsList() { @@ -398,14 +400,61 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable + *
  • If the task execution has state to load (from a checkpoint), then the location preference + * is the location of the previous execution (if there is a previous execution attempt). + *
  • If the task execution has no state or no previous location, then the location preference + * is based on the task's inputs. + * + * + * These rules should result in the following behavior: + * + *
      + *
    • Stateless tasks are always scheduled based on co-location with inputs. + *
    • Stateful tasks are on their initial attempt executed based on co-location with inputs. + *
    • Repeated executions of stateful tasks try to co-locate the execution with its state. + *
    + * + * @return The preferred excution locations for the execution attempt. + * + * @see #getPreferredLocationsBasedOnState() + * @see #getPreferredLocationsBasedOnInputs() + */ + public Iterable getPreferredLocations() { + Iterable basedOnState = getPreferredLocationsBasedOnState(); + return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs(); + } + + /** + * Gets the preferred location to execute the current task execution attempt, based on the state + * that the execution attempt will resume. + * + * @return A size-one iterable with the location preference, or null, if there is no + * location preference based on the state. + */ + public Iterable getPreferredLocationsBasedOnState() { + TaskManagerLocation priorLocation; + if (currentExecution.getTaskStateHandles() != null && (priorLocation = getLatestPriorLocation()) != null) { + return Collections.singleton(priorLocation); + } + else { + return null; + } + } + + /** + * Gets the location preferences of the vertex's current task execution, as determined by the locations + * of the predecessors from which it receives input data. * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this * method returns {@code null} to indicate no location preference. * - * @return The preferred locations for this vertex execution, or null, if there is no preference. + * @return The preferred locations based in input streams, or an empty iterable, + * if there is no input-based preference. */ - public Iterable getPreferredLocations() { + public Iterable getPreferredLocationsBasedOnInputs() { // otherwise, base the preferred locations on the input connections if (inputEdges == null) { return Collections.emptySet(); @@ -435,7 +484,7 @@ public class ExecutionVertex implements AccessExecutionVertex, ArchiveableemptyList() : locations; } } http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 6fac3c8..672431e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -984,8 +984,10 @@ public class SlotPool extends RpcEndpoint { @Override public Future allocateSlot(ScheduledUnit task, boolean allowQueued) { - return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, - task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout); + Iterable locationPreferences = + task.getTaskToExecute().getVertex().getPreferredLocations(); + + return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index 346cc77..88fbc10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -262,7 +262,7 @@ public class SlotSharingGroupAssignment { /** * Gets a slot suitable for the given task vertex. This method will prefer slots that are local - * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local + * (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local * slots if no local slot is available. The method returns null, when this sharing group has * no slot is available for the given JobVertexID. * @@ -271,7 +271,7 @@ public class SlotSharingGroupAssignment { * @return A slot to execute the given ExecutionVertex in, or null, if none is available. */ public SimpleSlot getSlotForTask(ExecutionVertex vertex) { - return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocations()); + return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs()); } /** @@ -313,7 +313,7 @@ public class SlotSharingGroupAssignment { * shared slot is available. */ public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) { - return getSlotForTask(constraint, vertex.getPreferredLocations()); + return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs()); } SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable locationPreferences) { http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 466a148..dc82440 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -174,7 +174,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); - final Iterable preferredLocations = vertex.getPreferredLocations(); + final Iterable preferredLocations = vertex.getPreferredLocationsBasedOnInputs(); final boolean forceExternalLocation = false && preferredLocations != null && preferredLocations.iterator().hasNext(); @@ -240,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl localOnly = true; } else { - locations = vertex.getPreferredLocations(); + locations = vertex.getPreferredLocationsBasedOnInputs(); localOnly = forceExternalLocation; } http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java index 269a8f3..4910862 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java @@ -65,7 +65,7 @@ public class AllocatedSlot { JobID jobID, TaskManagerLocation location, int slotNumber, - ResourceProfile resourceProfile, + ResourceProfile resourceProfile, TaskManagerGateway taskManagerGateway) { this.slotAllocationId = checkNotNull(slotAllocationId); this.jobID = checkNotNull(jobID); http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java new file mode 100644 index 0000000..36b7575 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +/** + * Tests that the execution vertex handles locality preferences well. + */ +public class ExecutionVertexLocalityTest extends TestLogger { + + private final JobID jobId = new JobID(); + + private final JobVertexID sourceVertexId = new JobVertexID(); + private final JobVertexID targetVertexId = new JobVertexID(); + + /** + * This test validates that vertices that have only one input stream try to + * co-locate their tasks with the producer. + */ + @Test + public void testLocalityInputBasedForward() throws Exception { + final int parallelism = 10; + final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism]; + + final ExecutionGraph graph = createTestGraph(parallelism, false); + + // set the location for all sources to a distinct location + for (int i = 0; i < parallelism; i++) { + ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; + + TaskManagerLocation location = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); + + locations[i] = location; + initializeLocation(source, location); + } + + // validate that the target vertices have no location preference + for (int i = 0; i < parallelism; i++) { + ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; + Iterator preference = target.getPreferredLocations().iterator(); + + assertTrue(preference.hasNext()); + assertEquals(locations[i], preference.next()); + assertFalse(preference.hasNext()); + } + } + + /** + * This test validates that vertices with too many input streams do not have a location + * preference any more. + */ + @Test + public void testNoLocalityInputLargeAllToAll() throws Exception { + final int parallelism = 100; + + final ExecutionGraph graph = createTestGraph(parallelism, true); + + // set the location for all sources to a distinct location + for (int i = 0; i < parallelism; i++) { + ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; + TaskManagerLocation location = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); + initializeLocation(source, location); + } + + // validate that the target vertices have no location preference + for (int i = 0; i < parallelism; i++) { + ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; + + Iterator preference = target.getPreferredLocations().iterator(); + assertFalse(preference.hasNext()); + } + } + + /** + * This test validates that stateful vertices schedule based in the state's location + * (which is the prior execution's location). + */ + @Test + public void testLocalityBasedOnState() throws Exception { + final int parallelism = 10; + final TaskManagerLocation[] locations = new TaskManagerLocation[parallelism]; + + final ExecutionGraph graph = createTestGraph(parallelism, false); + + // set the location for all sources and targets + for (int i = 0; i < parallelism; i++) { + ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; + ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; + + TaskManagerLocation randomLocation = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i); + + TaskManagerLocation location = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 20000 + i); + + locations[i] = location; + initializeLocation(source, randomLocation); + initializeLocation(target, location); + + setState(source.getCurrentExecutionAttempt(), ExecutionState.CANCELED); + setState(target.getCurrentExecutionAttempt(), ExecutionState.CANCELED); + } + + // mimic a restart: all vertices get re-initialized without actually being executed + for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) { + ejv.resetForNewExecution(); + } + + // set new location for the sources and some state for the targets + for (int i = 0; i < parallelism; i++) { + // source location + ExecutionVertex source = graph.getAllVertices().get(sourceVertexId).getTaskVertices()[i]; + TaskManagerLocation randomLocation = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 30000 + i); + initializeLocation(source, randomLocation); + + // target state + ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; + target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateHandles.class)); + } + + // validate that the target vertices have the state's location as the location preference + for (int i = 0; i < parallelism; i++) { + ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i]; + Iterator preference = target.getPreferredLocations().iterator(); + + assertTrue(preference.hasNext()); + assertEquals(locations[i], preference.next()); + assertFalse(preference.hasNext()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Creates a simple 2 vertex graph with a parallel source and a parallel target. + */ + private ExecutionGraph createTestGraph(int parallelism, boolean allToAll) throws Exception { + + JobVertex source = new JobVertex("source", sourceVertexId); + source.setParallelism(parallelism); + source.setInvokableClass(NoOpInvokable.class); + + JobVertex target = new JobVertex("source", targetVertexId); + target.setParallelism(parallelism); + target.setInvokableClass(NoOpInvokable.class); + + DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE; + target.connectNewDataSetAsInput(source, connectionPattern); + + JobGraph testJob = new JobGraph(jobId, "test job", source, target); + + return ExecutionGraphBuilder.buildGraph( + null, + testJob, + new Configuration(), + Executors.directExecutor(), + Executors.directExecutor(), + getClass().getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + Time.of(10, TimeUnit.SECONDS), + new FixedDelayRestartStrategy(10, 0L), + new UnregisteredMetricsGroup(), + 1, + log); + } + + private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception { + // we need a bit of reflection magic to initialize the location without going through + // scheduling paths. we choose to do that, rather than the alternatives: + // - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted + // - exposing test methods in the ExecutionVertex leads to undesirable setters + + AllocatedSlot slot = new AllocatedSlot( + new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class)); + + SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0); + + final Field locationField = Execution.class.getDeclaredField("assignedResource"); + locationField.setAccessible(true); + + locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot); + } + + private void setState(Execution execution, ExecutionState state) throws Exception { + final Field stateField = Execution.class.getDeclaredField("state"); + stateField.setAccessible(true); + + stateField.set(execution, state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9ed4ff1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index b36de77..9e692ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -106,7 +106,7 @@ public class SchedulerTestUtils { public static Execution getTestVertex(Iterable preferredLocations) { ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getPreferredLocations()).thenReturn(preferredLocations); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.toString()).thenReturn("TEST-VERTEX"); @@ -119,7 +119,7 @@ public class SchedulerTestUtils { public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) { ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getPreferredLocations()).thenReturn(null); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.getJobvertexId()).thenReturn(jid); when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex); @@ -139,7 +139,7 @@ public class SchedulerTestUtils { ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations)); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations)); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.getJobvertexId()).thenReturn(jid); when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);