flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/4] flink git commit: [FLINK-1478] [jobmanager] Scheduler support for external location constraints
Date Thu, 05 Feb 2015 18:27:33 GMT
[FLINK-1478] [jobmanager] Scheduler support for external location constraints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970b2b7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970b2b7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970b2b7a

Branch: refs/heads/master
Commit: 970b2b7af85f62f525e697dfa1520e88c8fc5e51
Parents: a9ac7aa
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Feb 5 18:28:12 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 5 18:50:40 2015 +0100

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      |   9 +
 .../runtime/executiongraph/ExecutionVertex.java |  57 ++-
 .../runtime/jobmanager/scheduler/Scheduler.java | 119 +++--
 .../scheduler/SlotSharingGroupAssignment.java   |   7 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   5 +-
 .../ExecutionVertexDeploymentTest.java          |  12 +-
 .../VertexLocationConstraintTest.java           | 436 +++++++++++++++++++
 7 files changed, 586 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6676e85..0439c08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -259,6 +259,15 @@ public class ExecutionJobVertex implements Serializable {
 	//---------------------------------------------------------------------------------------------
 	
 	public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException
{
+		
+//		ExecutionVertex[] vertices = this.taskVertices;
+//		
+//		for (int i = 0; i < vertices.length; i++) {
+//			ExecutionVertex v = vertices[i];
+//			
+//			if (v.get 
+//		}
+		
 		for (ExecutionVertex ev : getTaskVertices()) {
 			ev.scheduleForExecution(scheduler, queued);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 05cea73..86173da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,6 +43,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -82,6 +83,11 @@ public class ExecutionVertex implements Serializable {
 	
 	private volatile Execution currentExecution;	// this field must never be null
 	
+	
+	private volatile List<Instance> locationConstraintInstances;
+	
+	private volatile boolean scheduleLocalOnly;
+	
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -294,10 +300,22 @@ public class ExecutionVertex implements Serializable {
 		}
 	}
 	
-	public void setTargetHostConstraint(String hostname) {
+	public void setLocationConstraintHosts(List<Instance> instances) {
+		this.locationConstraintInstances = instances;
+	}
+	
+	public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
+		if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0)
{
+			throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
+		}
 		
+		this.scheduleLocalOnly = scheduleLocalOnly;
 	}
 
+	public boolean isScheduleLocalOnly() {
+		return scheduleLocalOnly;
+	}
+	
 	/**
 	 * Gets the location preferences of this task, determined by the locations of the predecessors
from which
 	 * it receives input data.
@@ -307,23 +325,37 @@ public class ExecutionVertex implements Serializable {
 	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
 	 */
 	public Iterable<Instance> getPreferredLocations() {
-		HashSet<Instance> locations = new HashSet<Instance>();
+		// if we have hard location constraints, use those
+		{
+			List<Instance> constraintInstances = this.locationConstraintInstances;
+			if (constraintInstances != null && !constraintInstances.isEmpty()) {
+				return constraintInstances;
+			}
+		}
+		
+		// otherwise, base the preferred locations on the input connections
+		if (inputEdges == null) {
+			return Collections.emptySet();
+		}
+		else {
+			HashSet<Instance> locations = new HashSet<Instance>();
 		
-		for (int i = 0; i < inputEdges.length; i++) {
-			ExecutionEdge[] sources = inputEdges[i];
-			if (sources != null) {
-				for (int k = 0; k < sources.length; k++) {
-					SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
-					if (sourceSlot != null) {
-						locations.add(sourceSlot.getInstance());
-						if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-							return null;
+			for (int i = 0; i < inputEdges.length; i++) {
+				ExecutionEdge[] sources = inputEdges[i];
+				if (sources != null) {
+					for (int k = 0; k < sources.length; k++) {
+						SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
+						if (sourceSlot != null) {
+							locations.add(sourceSlot.getInstance());
+							if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+								return null;
+							}
 						}
 					}
 				}
 			}
+			return locations;
 		}
-		return locations;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -407,6 +439,7 @@ public class ExecutionVertex implements Serializable {
 		// clear the unnecessary fields in this class
 		this.resultPartitions = null;
 		this.inputEdges = null;
+		this.locationConstraintInstances = null;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 1ad7a50..466e0e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import akka.dispatch.Futures;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -72,6 +73,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	
 	private int nonLocalizedAssignments;
 	
+	
 	public Scheduler() {
 		this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
 	}
@@ -164,6 +166,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 		}
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
+		
+		final Iterable<Instance> preferredLocations = vertex.getPreferredLocations();
+		final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
+									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
 		synchronized (globalLock) {
 		
@@ -179,6 +185,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 				final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
 				final CoLocationConstraint constraint = task.getLocationConstraint();
 				
+				// sanity check that we do not use an externally forced location and a co-location constraint
together
+				if (constraint != null && forceExternalLocation) {
+					throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously
by a "
+							+ "co-location constriaint and an external location constraint.");
+				}
+				
 				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
 				SimpleSlot slotFromGroup;
 				if (constraint == null) {
@@ -206,7 +218,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 							vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());
 					
 					// get a new slot, since we could not place it into the group, or we could not place
it locally
-					newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint);
+					newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);
 
 					SimpleSlot toUse;
 					
@@ -214,8 +226,20 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 						if (slotFromGroup == null) {
 							// both null
 							if (constraint == null || constraint.isUnassigned()) {
-								throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(),
getNumberOfAvailableSlots());
-							} else {
+								if (forceExternalLocation) {
+									// could not satisfy the external location constraint
+									String hosts = getHostnamesFromInstances(preferredLocations);
+									throw new NoResourceAvailableException("Could not schedule task " + vertex
+											+ " to any of the required hosts: " + hosts);
+								}
+								else {
+									// simply nothing is available
+									throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
+											getTotalNumberOfSlots(), getNumberOfAvailableSlots());
+								}
+							}
+							else {
+								// nothing is available on the node where the co-location constraint pushes us
 								throw new NoResourceAvailableException("Could not allocate a slot on instance " +

 											constraint.getLocation() + ", as required by the co-location constraint.");
 							}
@@ -269,26 +293,49 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 			}
 		
 			// 2) === schedule without hints and sharing ===
-			
-			SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations());
-			if (slot != null) {
-				updateLocalityCounters(slot.getLocality());
-				return slot;
-			}
-			else {
-				// no resource available now, so queue the request
-				if (queueIfNoResource) {
-					SlotAllocationFuture future = new SlotAllocationFuture();
-					this.taskQueue.add(new QueuedTask(task, future));
-					return future;
+			{
+				SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
+				if (slot != null) {
+					updateLocalityCounters(slot.getLocality());
+					return slot;
 				}
 				else {
-					throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(),
getNumberOfAvailableSlots());
+					// no resource available now, so queue the request
+					if (queueIfNoResource) {
+						SlotAllocationFuture future = new SlotAllocationFuture();
+						this.taskQueue.add(new QueuedTask(task, future));
+						return future;
+					}
+					else if (forceExternalLocation) {
+						String hosts = getHostnamesFromInstances(preferredLocations);
+						throw new NoResourceAvailableException("Could not schedule task " + vertex
+								+ " to any of the required hosts: " + hosts);
+					}
+					else {
+						throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(),
getNumberOfAvailableSlots());
+					}
 				}
 			}
 		}
 	}
+	
+	private String getHostnamesFromInstances(Iterable<Instance> instances) {
+		StringBuilder bld = new StringBuilder();
+		
+		for (Instance i : instances) {
+			bld.append(i.getInstanceConnectionInfo().getHostname());
+			bld.append(", ");
+		}
 		
+		if (bld.length() == 0) {
+			return "";
+		}
+		else {
+			bld.setLength(bld.length() - 2);
+			return bld.toString();
+		}
+	}
+	
 	/**
 	 * Gets a suitable instance to schedule the vertex execution to.
 	 * <p>
@@ -297,21 +344,21 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	 * @param vertex The task to run. 
 	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
 	 */
-	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance>
requestedLocations) {
+	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance>
requestedLocations, boolean localOnly) {
 		
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
 		while (true) {
-			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
+			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations,
localOnly);
 
-			if(instanceLocalityPair == null){
+			if (instanceLocalityPair == null){
 				return null;
 			}
 
 			Instance instanceToUse = instanceLocalityPair.getLeft();
 			Locality locality = instanceLocalityPair.getRight();
 
-			if(LOG.isDebugEnabled()){
+			if (LOG.isDebugEnabled()){
 				if(locality == Locality.LOCAL){
 					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
 				}else if(locality == Locality.NON_LOCAL){
@@ -348,25 +395,26 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
 											Iterable<Instance> requestedLocations,
 											SlotSharingGroupAssignment groupAssignment,
-											CoLocationConstraint constraint) {
+											CoLocationConstraint constraint,
+											boolean localOnly) {
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
 		while (true) {
-			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
+			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations,
localOnly);
 
-			if(instanceLocalityPair == null){
+			if (instanceLocalityPair == null) {
 				return null;
 			}
 
 			Instance instanceToUse = instanceLocalityPair.getLeft();
 			Locality locality = instanceLocalityPair.getRight();
 
-			if(LOG.isDebugEnabled()){
-				if(locality == Locality.LOCAL){
+			if (LOG.isDebugEnabled()) {
+				if (locality == Locality.LOCAL) {
 					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}else if(locality == Locality.NON_LOCAL){
+				} else if(locality == Locality.NON_LOCAL) {
 					LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-				}else if(locality == Locality.UNCONSTRAINED) {
+				} else if(locality == Locality.UNCONSTRAINED) {
 					LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
 				}
 			}
@@ -409,7 +457,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	 *
 	 * @param requestedLocations
 	 */
-	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){
+	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations,
boolean localOnly){
+		
 		if (this.instancesWithAvailableResources.isEmpty()) {
 			// check if the asynchronous calls did not yet return the queues
 			Instance queuedInstance = this.newlyAvailableInstances.poll();
@@ -434,14 +483,18 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 				if (location != null && this.instancesWithAvailableResources.remove(location))
{
 					instanceToUse = location;
 					locality = Locality.LOCAL;
-
 					break;
 				}
 			}
 
 			if (instanceToUse == null) {
-				instanceToUse = this.instancesWithAvailableResources.poll();
-				locality = Locality.NON_LOCAL;
+				if (localOnly) {
+					return null;
+				}
+				else {
+					instanceToUse = this.instancesWithAvailableResources.poll();
+					locality = Locality.NON_LOCAL;
+				}
 			}
 		}
 		else {
@@ -603,8 +656,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	public int getNumberOfAvailableInstances() {
 		int numberAvailableInstances = 0;
 		synchronized (this.globalLock) {
-			for(Instance instance: allInstances){
-				if(instance.isAlive()){
+			for (Instance instance: allInstances ){
+				if (instance.isAlive()){
 					numberAvailableInstances++;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index 70d4510..4e0349a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 
 
-public class SlotSharingGroupAssignment implements Serializable {
-
-	static final long serialVersionUID = 42L;
+public class SlotSharingGroupAssignment {
 
 	private static final Logger LOG = Scheduler.LOG;
 
-	private transient final Object lock = new Object();
+	private final Object lock = new Object();
 
 	/** All slots currently allocated to this sharing group */
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7d8229c..d136d6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -101,9 +101,8 @@ public class ExecutionGraphTestUtils {
 		return getInstance(taskManager, 1);
 	}
 
-	public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots)
throws
-			Exception {
-				HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024,
1024*1024*1024, 512*1024*1024);
+	public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots)
throws Exception {
+		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024,
1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d8a7db3..f5089c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -102,7 +102,7 @@ public class ExecutionVertexDeploymentTest {
 
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -202,7 +202,7 @@ public class ExecutionVertexDeploymentTest {
 
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -238,7 +238,7 @@ public class ExecutionVertexDeploymentTest {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -287,7 +287,7 @@ public class ExecutionVertexDeploymentTest {
 
 			TestingUtils.setExecutionContext(ec);
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			final Instance instance = getInstance(simpleTaskManager);
 			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
@@ -337,7 +337,7 @@ public class ExecutionVertexDeploymentTest {
 					AkkaUtils.getDefaultTimeout());
 			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			final TestActorRef simpleTaskManager = TestActorRef.create(system, Props.create(new
+			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(new
 					ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
 					TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/970b2b7a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
new file mode 100644
index 0000000..cae10f4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+
+public class VertexLocationConstraintTest {
+
+	private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS);
+	
+	private static ActorSystem system;
+	
+	private static TestActorRef<? extends Actor> taskManager;
+	
+	
+	@BeforeClass
+	public static void setup() {
+		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		
+		taskManager = TestActorRef.create(system,
+				Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+		system = null;
+	}
+	
+	
+	@Test
+	public void testScheduleWithConstraint1() {
+		try {
+			final byte[] address1 = { 10, 0, 1, 4 };
+			final byte[] address2 = { 10, 0, 1, 5 };
+			final byte[] address3 = { 10, 0, 1, 6 };
+			
+			final String hostname1 = "host1";
+			final String hostname2 = "host2";
+			final String hostname3 = "host3";
+			
+			// prepare the scheduler
+			Instance instance1 = getInstance(address1, 6789, hostname1);
+			Instance instance2 = getInstance(address2, 6789, hostname2);
+			Instance instance3 = getInstance(address3, 6789, hostname3);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			scheduler.newInstanceAvailable(instance2);
+			scheduler.newInstanceAvailable(instance3);
+			
+			// prepare the execution graph
+			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			jobVertex.setInvokableClass(DummyInvokable.class);
+			jobVertex.setParallelism(2);
+			JobGraph jg = new JobGraph("test job", jobVertex);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Collections.singletonList(jobVertex));
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			
+			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+			
+			vertices[0].setScheduleLocalOnly(true);
+			vertices[1].setScheduleLocalOnly(true);
+			
+			ejv.scheduleAll(scheduler, false);
+			
+			SimpleSlot slot1 = vertices[0].getCurrentAssignedResource();
+			SimpleSlot slot2 = vertices[1].getCurrentAssignedResource();
+			
+			assertNotNull(slot1);
+			assertNotNull(slot2);
+			
+			Instance target1 = slot1.getInstance();
+			Instance target2 = slot2.getInstance();
+			
+			assertNotNull(target1);
+			assertNotNull(target2);
+			
+			assertTrue(target1 == instance1 || target1 == instance2);
+			assertTrue(target2 == instance3);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleWithConstraint2() {
+		
+		// same test as above, which swapped host names to guard against "accidentally worked"
because of
+		// the order in which requests are handles by internal data structures
+		
+		try {
+			final byte[] address1 = { 10, 0, 1, 4 };
+			final byte[] address2 = { 10, 0, 1, 5 };
+			final byte[] address3 = { 10, 0, 1, 6 };
+			
+			final String hostname1 = "host1";
+			final String hostname2 = "host2";
+			final String hostname3 = "host3";
+			
+			// prepare the scheduler
+			Instance instance1 = getInstance(address1, 6789, hostname1);
+			Instance instance2 = getInstance(address2, 6789, hostname2);
+			Instance instance3 = getInstance(address3, 6789, hostname3);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			scheduler.newInstanceAvailable(instance2);
+			scheduler.newInstanceAvailable(instance3);
+			
+			// prepare the execution graph
+			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			jobVertex.setInvokableClass(DummyInvokable.class);
+			jobVertex.setParallelism(2);
+			JobGraph jg = new JobGraph("test job", jobVertex);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Collections.singletonList(jobVertex));
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3));
+			vertices[1].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+			
+			vertices[0].setScheduleLocalOnly(true);
+			vertices[1].setScheduleLocalOnly(true);
+			
+			ejv.scheduleAll(scheduler, false);
+			
+			SimpleSlot slot1 = vertices[0].getCurrentAssignedResource();
+			SimpleSlot slot2 = vertices[1].getCurrentAssignedResource();
+			
+			assertNotNull(slot1);
+			assertNotNull(slot2);
+			
+			Instance target1 = slot1.getInstance();
+			Instance target2 = slot2.getInstance();
+			
+			assertNotNull(target1);
+			assertNotNull(target2);
+			
+			assertTrue(target1 == instance3);
+			assertTrue(target2 == instance1 || target2 == instance2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleWithConstraintAndSlotSharing() {
+		try {
+			final byte[] address1 = { 10, 0, 1, 4 };
+			final byte[] address2 = { 10, 0, 1, 5 };
+			final byte[] address3 = { 10, 0, 1, 6 };
+			
+			final String hostname1 = "host1";
+			final String hostname2 = "host2";
+			final String hostname3 = "host3";
+			
+			// prepare the scheduler
+			Instance instance1 = getInstance(address1, 6789, hostname1);
+			Instance instance2 = getInstance(address2, 6789, hostname2);
+			Instance instance3 = getInstance(address3, 6789, hostname3);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			scheduler.newInstanceAvailable(instance2);
+			scheduler.newInstanceAvailable(instance3);
+			
+			// prepare the execution graph
+			AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID());
+			AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID());
+			jobVertex1.setInvokableClass(DummyInvokable.class);
+			jobVertex2.setInvokableClass(DummyInvokable.class);
+			jobVertex1.setParallelism(2);
+			jobVertex2.setParallelism(3);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			jobVertex1.setSlotSharingGroup(sharingGroup);
+			jobVertex2.setSlotSharingGroup(sharingGroup);
+			
+			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			
+			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+			
+			vertices[0].setScheduleLocalOnly(true);
+			vertices[1].setScheduleLocalOnly(true);
+			
+			ejv.scheduleAll(scheduler, false);
+			
+			SimpleSlot slot1 = vertices[0].getCurrentAssignedResource();
+			SimpleSlot slot2 = vertices[1].getCurrentAssignedResource();
+			
+			assertNotNull(slot1);
+			assertNotNull(slot2);
+			
+			Instance target1 = slot1.getInstance();
+			Instance target2 = slot2.getInstance();
+			
+			assertNotNull(target1);
+			assertNotNull(target2);
+			
+			assertTrue(target1 == instance1 || target1 == instance2);
+			assertTrue(target2 == instance3);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleWithUnfulfillableConstraint() {
+		
+		// same test as above, which swapped host names to guard against "accidentally worked"
because of
+		// the order in which requests are handles by internal data structures
+		
+		try {
+			final byte[] address1 = { 10, 0, 1, 4 };
+			final byte[] address2 = { 10, 0, 1, 5 };
+			
+			final String hostname1 = "host1";
+			final String hostname2 = "host2";
+			
+			// prepare the scheduler
+			Instance instance1 = getInstance(address1, 6789, hostname1);
+			Instance instance2 = getInstance(address2, 6789, hostname2);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			
+			// prepare the execution graph
+			AbstractJobVertex jobVertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			jobVertex.setInvokableClass(DummyInvokable.class);
+			jobVertex.setParallelism(1);
+			JobGraph jg = new JobGraph("test job", jobVertex);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Collections.singletonList(jobVertex));
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+			vertices[0].setScheduleLocalOnly(true);
+			
+			try {
+				ejv.scheduleAll(scheduler, false);
+				fail("This should fail with a NoResourceAvailableException");
+			}
+			catch (NoResourceAvailableException e) {
+				// bam! we are good...
+				assertTrue(e.getMessage().contains(hostname2));
+			}
+			catch (Exception e) {
+				fail("Wrong exception type");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleWithUnfulfillableConstraintInSharingGroup() {
+		
+		// same test as above, which swapped host names to guard against "accidentally worked"
because of
+		// the order in which requests are handles by internal data structures
+		
+		try {
+			final byte[] address1 = { 10, 0, 1, 4 };
+			final byte[] address2 = { 10, 0, 1, 5 };
+			
+			final String hostname1 = "host1";
+			final String hostname2 = "host2";
+			
+			// prepare the scheduler
+			Instance instance1 = getInstance(address1, 6789, hostname1);
+			Instance instance2 = getInstance(address2, 6789, hostname2);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			
+			// prepare the execution graph
+			AbstractJobVertex jobVertex1 = new AbstractJobVertex("v1", new JobVertexID());
+			AbstractJobVertex jobVertex2 = new AbstractJobVertex("v2", new JobVertexID());
+			
+			jobVertex1.setInvokableClass(DummyInvokable.class);
+			jobVertex2.setInvokableClass(DummyInvokable.class);
+			
+			jobVertex1.setParallelism(1);
+			jobVertex2.setParallelism(1);
+			
+			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			jobVertex1.setSlotSharingGroup(sharingGroup);
+			jobVertex2.setSlotSharingGroup(sharingGroup);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
+			
+			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+			vertices[0].setScheduleLocalOnly(true);
+			
+			try {
+				ejv.scheduleAll(scheduler, false);
+				fail("This should fail with a NoResourceAvailableException");
+			}
+			catch (NoResourceAvailableException e) {
+				// bam! we are good...
+				assertTrue(e.getMessage().contains(hostname2));
+			}
+			catch (Exception e) {
+				fail("Wrong exception type");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testArchivingClearsFields() {
+		try {
+			AbstractJobVertex vertex = new AbstractJobVertex("test vertex", new JobVertexID());
+			JobGraph jg = new JobGraph("test job", vertex);
+			
+			ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(),
timeout);
+			eg.attachJobGraph(Collections.singletonList(vertex));
+			
+			ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
+			
+			Instance instance = ExecutionGraphTestUtils.getInstance(ActorRef.noSender());
+			ev.setLocationConstraintHosts(Collections.singletonList(instance));
+			
+			assertNotNull(ev.getPreferredLocations());
+			assertEquals(instance, ev.getPreferredLocations().iterator().next());
+			
+			// transition to a final state
+			eg.fail(new Exception());
+			
+			eg.prepareForArchiving();
+			
+			assertTrue(ev.getPreferredLocations() == null || !ev.getPreferredLocations().iterator().hasNext());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static Instance getInstance(byte[] ipAddress, int dataPort, String hostname) throws
Exception {
+		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024,
1024*1024*1024, 512*1024*1024);
+		
+		InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
+		when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
+		when(connection.dataPort()).thenReturn(dataPort);
+		when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
+		when(connection.getHostname()).thenReturn(hostname);
+		when(connection.getFQDNHostname()).thenReturn(hostname);
+		
+		return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1);
+	}
+}


Mime
View raw message