flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1478] [jobmanager] Add deterministic strictly local split assignment (part 2)
Date Mon, 09 Feb 2015 14:55:20 GMT
[FLINK-1478] [jobmanager] Add deterministic strictly local split assignment (part 2)

This closes #375


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4386620c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4386620c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4386620c

Branch: refs/heads/master
Commit: 4386620c06e94c9f4e3030ea7ae0f480845e2969
Parents: 6fcef7d
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Feb 8 19:17:44 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 9 15:45:36 2015 +0100

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      | 293 ++++++++-----
 .../runtime/jobmanager/scheduler/Scheduler.java |  86 +++-
 .../executiongraph/LocalInputSplitsTest.java    | 437 +++++++++++++++++++
 3 files changed, 677 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4386620c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 235fd1f..45e3245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -77,7 +78,7 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private final InputSplit[] inputSplits;
 	
-	private List<InputSplit>[] inputSplitsPerSubtask;
+	private List<LocatableInputSplit>[] inputSplitsPerSubtask;
 	
 	private InputSplitAssigner splitAssigner;
 	
@@ -139,134 +140,48 @@ public class ExecutionJobVertex implements Serializable {
 		try {
 			@SuppressWarnings("unchecked")
 			InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>)
jobVertex.getInputSplitSource();
+			
 			if (splitSource != null) {
-				this.inputSplits = splitSource.createInputSplits(numTaskVertices);
+				inputSplits = splitSource.createInputSplits(numTaskVertices);
 				
-				if (splitSource instanceof StrictlyLocalAssignment) {
-					
-					// group the splits by host while preserving order per host
-					Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String,
List<LocatableInputSplit>>();
-					for(int i=0; i<this.inputSplits.length; i++) {
-
-						// check that split has exactly one local host
-						LocatableInputSplit lis;
-						InputSplit is = this.inputSplits[i];
-						if(!(is instanceof LocatableInputSplit)) {
-							new JobException("Invalid InputSplit type "+is.getClass().getCanonicalName()+". "
+
-									"Strictly local assignment requires LocatableInputSplit");
-						}
-						lis = (LocatableInputSplit) is;
-
-						if(lis.getHostnames() == null) {
-							throw new JobException("LocatableInputSplit has no host information. " +
-									"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
-						} else if (lis.getHostnames().length != 1) {
-							throw new JobException("Strictly local assignment requires exactly one hostname for
each LocatableInputSplit.");
-						}
-						String hostName = lis.getHostnames()[0];
-
-						List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
-						if(hostSplits == null) {
-							hostSplits = new ArrayList<LocatableInputSplit>();
-							splitsByHost.put(hostName, hostSplits);
-						}
-						hostSplits.add(lis);
-					}
-
-					// assign subtasks to hosts
-					// get list of hosts in deterministic order
-					List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
-					Collections.sort(hosts);
-					int numSubTasks = this.getParallelism();
-					int numHosts = hosts.size();
-					if(numSubTasks < numHosts) {
-						throw new JobException("Strictly local split assignment requires at least as " +
-								"many parallel subtasks as distinct split hosts. Please increase the parallelism
" +
-								"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
-					}
-
-					int numSubTasksPerHost = numSubTasks / numHosts;
-					int numHostWithOneMore = numSubTasks % numHosts;
-
-					Map<String, int[]> subTaskHostAssignment = new HashMap<String, int[]>(numHosts);
-					int assignedHostsCnt = 0;
-					int assignedTasksCnt = 0;
-					for(String host : hosts) {
-						int numTasksToAssign = assignedHostsCnt < numHostWithOneMore ? numSubTasksPerHost
+ 1 : numSubTasksPerHost;
-						int[] subTasks = new int[numTasksToAssign];
-						for(int i=0; i<numTasksToAssign; i++) {
-							subTasks[i] = assignedTasksCnt++;
-						}
-						subTaskHostAssignment.put(host, subTasks);
-						assignedHostsCnt++;
+				if (inputSplits != null) {
+					if (splitSource instanceof StrictlyLocalAssignment) {
+						inputSplitsPerSubtask = computeLocalInputSplitsPerTask(inputSplits);
+						splitAssigner = new PredeterminedInputSplitAssigner(inputSplitsPerSubtask);
+					} else {
+						splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
 					}
-
-					// attach locality constraint to subtask
-					for(String host : hosts) {
-						int[] subTasks = subTaskHostAssignment.get(host);
-
-						for(int taskId : subTasks) {
-							this.getTaskVertices()[taskId].setTargetHostConstraint(host);
-						}
-					}
-
-					// assign splits to subtasks
-					this.inputSplitsPerSubtask = (List<InputSplit>[])new List[numSubTasks];
-					for(String host : hosts) {
-						List<LocatableInputSplit> localSplits = splitsByHost.get(host);
-						int[] localSubTasks = subTaskHostAssignment.get(host);
-
-						// init lists
-						for(int i=0; i<localSubTasks.length; i++) {
-							this.inputSplitsPerSubtask[localSubTasks[i]] = new ArrayList<InputSplit>();
-						}
-
-						int subTaskIdx = 0;
-						while(!localSplits.isEmpty()) {
-							int subTask = localSubTasks[subTaskIdx++];
-							this.inputSplitsPerSubtask[subTask].add(localSplits.remove(localSplits.size() - 1));
-							if(subTaskIdx == localSubTasks.length) {
-								subTaskIdx = 0;
-							}
-						}
-					}
-
-					// create predetermined split assigner
-					this.splitAssigner = new PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask);
-					
-				} else {
-					this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
 				}
-			} else {
-				this.inputSplits = null;
-				this.splitAssigner = null;
+			}
+			else {
+				inputSplits = null;
 			}
 		}
 		catch (Throwable t) {
 			throw new JobException("Creating the input splits caused an error: " + t.getMessage(),
t);
 		}
 		
-		this.finishedSubtasks = new boolean[parallelism];
+		finishedSubtasks = new boolean[parallelism];
 	}
 
 	public ExecutionGraph getGraph() {
-		return this.graph;
+		return graph;
 	}
 	
 	public AbstractJobVertex getJobVertex() {
-		return this.jobVertex;
+		return jobVertex;
 	}
 
 	public int getParallelism() {
-		return this.parallelism;
+		return parallelism;
 	}
 	
 	public JobID getJobId() {
-		return this.graph.getJobID();
+		return graph.getJobID();
 	}
 	
 	public JobVertexID getJobVertexId() {
-		return this.jobVertex.getID();
+		return jobVertex.getID();
 	}
 	
 	public ExecutionVertex[] getTaskVertices() {
@@ -345,15 +260,49 @@ public class ExecutionJobVertex implements Serializable {
 	
 	public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException
{
 		
-//		ExecutionVertex[] vertices = this.taskVertices;
-//		
-//		for (int i = 0; i < vertices.length; i++) {
-//			ExecutionVertex v = vertices[i];
-//			
-//			if (v.get 
-//		}
+		ExecutionVertex[] vertices = this.taskVertices;
 		
-		for (ExecutionVertex ev : getTaskVertices()) {
+		// check if we need to do pre-assignment of tasks
+		if (inputSplitsPerSubtask != null) {
+		
+			final Map<String, List<Instance>> instances = scheduler.getInstancesByHost();
+			final Map<String, Integer> assignments = new HashMap<String, Integer>();
+			
+			for (int i = 0; i < vertices.length; i++) {
+				List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i];
+				if (splitsForHost == null || splitsForHost.isEmpty()) {
+					continue;
+				}
+				
+				String[] hostNames = splitsForHost.get(0).getHostnames();
+				if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) {
+					continue;
+				}
+				
+				String host = hostNames[0];
+				ExecutionVertex v = vertices[i];
+				
+				List<Instance> instancesOnHost = instances.get(host);
+				
+				if (instancesOnHost == null || instancesOnHost.isEmpty()) {
+					throw new NoResourceAvailableException("Cannot schedule a strictly local task to host
" + host
+							+ ". No TaskManager available on that host.");
+				}
+				
+				Integer pos = assignments.get(host);
+				if (pos == null) {
+					pos = 0;
+					assignments.put(host, 0);
+				} else {
+					assignments.put(host, pos + 1 % instancesOnHost.size());
+				}
+				
+				v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
+			}
+		}
+		
+		// kick off the tasks
+		for (ExecutionVertex ev : vertices) {
 			ev.scheduleForExecution(scheduler, queued);
 		}
 	}
@@ -504,30 +453,132 @@ public class ExecutionJobVertex implements Serializable {
 		}
 	}
 
+	private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits)
throws JobException {
+		
+		final int numSubTasks = getParallelism();
+		
+		// sanity check
+		if (numSubTasks > splits.length) {
+			throw new JobException("Strictly local assignment requires at least as many splits as
subtasks.");
+		}
+		
+		// group the splits by host while preserving order per host
+		Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String,
List<LocatableInputSplit>>();
+		
+		for (InputSplit split : splits) {
+			// check that split has exactly one local host
+			if(!(split instanceof LocatableInputSplit)) {
+				new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ".
" +
+						"Strictly local assignment requires LocatableInputSplit");
+			}
+			LocatableInputSplit lis = (LocatableInputSplit) split;
+
+			if (lis.getHostnames() == null) {
+				throw new JobException("LocatableInputSplit has no host information. " +
+						"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
+			}
+			else if (lis.getHostnames().length != 1) {
+				throw new JobException("Strictly local assignment requires exactly one hostname for each
LocatableInputSplit.");
+			}
+			String hostName = lis.getHostnames()[0];
+
+			List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
+			if (hostSplits == null) {
+				hostSplits = new ArrayList<LocatableInputSplit>();
+				splitsByHost.put(hostName, hostSplits);
+			}
+			hostSplits.add(lis);
+		}
+		
+		
+		int numHosts = splitsByHost.size();
+		
+		if (numSubTasks < numHosts) {
+			throw new JobException("Strictly local split assignment requires at least as " +
+					"many parallel subtasks as distinct split hosts. Please increase the parallelism " +
+					"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
+		}
+
+		// get list of hosts in deterministic order
+		List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
+		Collections.sort(hosts);
+		
+		@SuppressWarnings("unchecked")
+		List<LocatableInputSplit>[] subTaskSplitAssignment = (List<LocatableInputSplit>[])
new List<?>[numSubTasks];
+		
+		final int subtasksPerHost = numSubTasks / numHosts;
+		final int hostsWithOneMore = numSubTasks % numHosts;
+		
+		int subtaskNum = 0;
+		
+		// we go over all hosts and distribute the hosts' input splits
+		// over the subtasks
+		for (int hostNum = 0; hostNum < numHosts; hostNum++) {
+			String host = hosts.get(hostNum);
+			List<LocatableInputSplit> splitsOnHost = splitsByHost.get(host);
+			
+			int numSplitsOnHost = splitsOnHost.size();
+			
+			// the number of subtasks to split this over.
+			// NOTE: if the host has few splits, some subtasks will not get anything.
+			int subtasks = Math.min(numSplitsOnHost, 
+							hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);
+			
+			int splitsPerSubtask = numSplitsOnHost / subtasks;
+			int subtasksWithOneMore = numSplitsOnHost % subtasks;
+			
+			int splitnum = 0;
+			
+			// go over the subtasks and grab a subrange of the input splits
+			for (int i = 0; i < subtasks; i++) {
+				int numSplitsForSubtask = (i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask);
+				
+				List<LocatableInputSplit> splitList;
+				
+				if (numSplitsForSubtask == numSplitsOnHost) {
+					splitList = splitsOnHost;
+				}
+				else {
+					splitList = new ArrayList<LocatableInputSplit>(numSplitsForSubtask);
+					for (int k = 0; k < numSplitsForSubtask; k++) {
+						splitList.add(splitsOnHost.get(splitnum++));
+					}
+				}
+				
+				subTaskSplitAssignment[subtaskNum++] = splitList;
+			}
+		}
+		
+		return subTaskSplitAssignment;
+	}
+	
 	//---------------------------------------------------------------------------------------------
 	//  Predetermined InputSplitAssigner
 	//---------------------------------------------------------------------------------------------
 
 	public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
 
-		private List<InputSplit>[] inputSplitsPerSubtask;
+		private List<LocatableInputSplit>[] inputSplitsPerSubtask;
 
-		public PredeterminedInputSplitAssigner(List<InputSplit>[] inputSplitsPerSubtask)
{
+		@SuppressWarnings("unchecked")
+		public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask)
{
 			// copy input split assignment
-			this.inputSplitsPerSubtask = (List<InputSplit>[])new List[inputSplitsPerSubtask.length];
-			for(int i=0; i<inputSplitsPerSubtask.length; i++) {
-				this.inputSplitsPerSubtask[i] = new ArrayList<InputSplit>(inputSplitsPerSubtask[i].size());
-				this.inputSplitsPerSubtask[i].addAll(inputSplitsPerSubtask[i]);
+			this.inputSplitsPerSubtask = (List<LocatableInputSplit>[]) new List[inputSplitsPerSubtask.length];
+			for (int i = 0; i < inputSplitsPerSubtask.length; i++) {
+				List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
+				
+				this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ?
+						Collections.<LocatableInputSplit>emptyList() : 
+						new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
 			}
 		}
 
 		@Override
 		public InputSplit getNextInputSplit(String host, int taskId) {
-			if(inputSplitsPerSubtask[taskId].isEmpty()) {
+			if (inputSplitsPerSubtask[taskId].isEmpty()) {
 				return null;
 			} else {
-				InputSplit is = inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size()
- 1);
-				return is;
+				return inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4386620c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 466e0e9..4c7ada5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -58,6 +62,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 	/** All instances that the scheduler can deploy to */
 	private final Set<Instance> allInstances = new HashSet<Instance>();
 	
+	/** All instances by hostname */
+	private final HashMap<String, Set<Instance>> allInstancesByHost = new HashMap<String,
Set<Instance>>();
+	
 	/** All instances that still have available resources */
 	private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
 	
@@ -89,6 +96,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 				i.cancelAndReleaseAllSlots();
 			}
 			allInstances.clear();
+			allInstancesByHost.clear();
 			instancesWithAvailableResources.clear();
 			taskQueue.clear();
 		}
@@ -384,8 +392,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 			catch (InstanceDiedException e) {
 				// the instance died it has not yet been propagated to this scheduler
 				// remove the instance from the set of available instances
-				this.allInstances.remove(instanceToUse);
-				this.instancesWithAvailableResources.remove(instanceToUse);
+				removeInstance(instanceToUse);
 			}
 			
 			// if we failed to get a slot, fall through the loop
@@ -441,8 +448,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 			catch (InstanceDiedException e) {
 				// the instance died it has not yet been propagated to this scheduler
 				// remove the instance from the set of available instances
-				this.allInstances.remove(instanceToUse);
-				this.instancesWithAvailableResources.remove(instanceToUse);
+				removeInstance(instanceToUse);
 			}
 
 			// if we failed to get a slot, fall through the loop
@@ -563,10 +569,11 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 					}
 				}
 				catch (InstanceDiedException e) {
-					this.allInstances.remove(instance);
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Instance " + instance + " was marked dead asynchronously.");
 					}
+					
+					removeInstance(instance);
 				}
 			}
 			else {
@@ -591,6 +598,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 		}
 	}
 	
+	
+	
+	
+	
 	// --------------------------------------------------------------------------------------------
 	//  Instance Availability
 	// --------------------------------------------------------------------------------------------
@@ -616,19 +627,30 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 			}
 			
 			try {
+				// make sure we get notifications about slots becoming available
 				instance.setSlotAvailabilityListener(this);
+				
+				// store the instance in the by-host-lookup
+				String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+				Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
+				if (instanceSet == null) {
+					instanceSet = new HashSet<Instance>();
+					allInstancesByHost.put(instanceHostName, instanceSet);
+				}
+				instanceSet.add(instance);
+				
+					
+				// add it to the available resources and let potential waiters know
+				this.instancesWithAvailableResources.add(instance);
+	
+				// add all slots as available
+				for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
+					newSlotAvailable(instance);
+				}
 			}
-			catch (IllegalStateException e) {
-				this.allInstances.remove(instance);
-				LOG.error("Scheduler could not attach to the instance as a listener.");
-			}
-			
-			
-			// add it to the available resources and let potential waiters know
-			this.instancesWithAvailableResources.add(instance);
-			
-			for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
-				newSlotAvailable(instance);
+			catch (Throwable t) {
+				LOG.error("Scheduler could not add new instance " + instance, t);
+				removeInstance(instance);
 			}
 		}
 	}
@@ -644,8 +666,25 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 		// we only remove the instance from the pools, we do not care about the 
 		synchronized (this.globalLock) {
 			// the instance must not be available anywhere any more
-			this.allInstances.remove(instance);
-			this.instancesWithAvailableResources.remove(instance);
+			removeInstance(instance);
+		}
+	}
+	
+	private void removeInstance(Instance instance) {
+		if (instance == null) {
+			throw new NullPointerException();
+		}
+		
+		allInstances.remove(instance);
+		instancesWithAvailableResources.remove(instance);
+		
+		String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+		Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
+		if (instanceSet != null) {
+			instanceSet.remove(instance);
+			if (instanceSet.isEmpty()) {
+				allInstancesByHost.remove(instanceHostName);
+			}
 		}
 	}
 	
@@ -670,6 +709,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener
{
 		return instancesWithAvailableResources.size();
 	}
 	
+	public Map<String, List<Instance>> getInstancesByHost() {
+		synchronized (globalLock) {
+			HashMap<String, List<Instance>> copy = new HashMap<String, List<Instance>>();
+			
+			for (Map.Entry<String, Set<Instance>> entry : allInstancesByHost.entrySet())
{
+				copy.put(entry.getKey(), new ArrayList<Instance>(entry.getValue()));
+			}
+			return copy;
+		}
+	}
+	
 	public int getNumberOfUnconstrainedAssignments() {
 		return unconstrainedAssignments;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4386620c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
new file mode 100644
index 0000000..dfe6b50
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.StrictlyLocalAssignment;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.Actor;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+
+public class LocalInputSplitsTest {
+	
+	private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
+	
+	private static ActorSystem system;
+	
+	private static TestActorRef<? extends Actor> taskManager;
+	
+	
+	@BeforeClass
+	public static void setup() {
+		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		taskManager = TestActorRef.create(system,
+				Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+		system = null;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testNotEnoughSubtasks() {
+		int numHosts = 3;
+		int slotsPerHost = 1;
+		int parallelism = 2;
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, "host1"),
+				new TestLocatableInputSplit(2, "host2"),
+				new TestLocatableInputSplit(3, "host3")
+		};
+		
+		// This should fail with an exception, since the DOP of 2 does not
+		// support strictly local assignment onto 3 hosts
+		try {
+			runTests(numHosts, slotsPerHost, parallelism, splits);
+			fail("should throw an exception");
+		}
+		catch (JobException e) {
+			// what a great day!
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDisallowMultipleLocations() {
+		int numHosts = 2;
+		int slotsPerHost = 1;
+		int parallelism = 2;
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, new String[] { "host1", "host2" } ),
+				new TestLocatableInputSplit(2, new String[] { "host1", "host2" } )
+		};
+		
+		// This should fail with an exception, since strictly local assignment
+		// currently supports only one choice of host
+		try {
+			runTests(numHosts, slotsPerHost, parallelism, splits);
+			fail("should throw an exception");
+		}
+		catch (JobException e) {
+			// dandy!
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNonExistentHost() {
+		int numHosts = 2;
+		int slotsPerHost = 1;
+		int parallelism = 2;
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, "host1"),
+				new TestLocatableInputSplit(2, "bogus_host" )
+		};
+		
+		// This should fail with an exception, since one of the hosts does not exist
+		try {
+			runTests(numHosts, slotsPerHost, parallelism, splits);
+			fail("should throw an exception");
+		}
+		catch (JobException e) {
+			// dandy!
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testEqualSplitsPerHostAndSubtask() {
+		int numHosts = 5;
+		int slotsPerHost = 2;
+		int parallelism = 10;
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(7, "host4"),
+				new TestLocatableInputSplit(8, "host4"),
+				new TestLocatableInputSplit(1, "host1"),
+				new TestLocatableInputSplit(2, "host1"),
+				new TestLocatableInputSplit(3, "host2"),
+				new TestLocatableInputSplit(4, "host2"),
+				new TestLocatableInputSplit(5, "host3"),
+				new TestLocatableInputSplit(6, "host3"),
+				new TestLocatableInputSplit(9, "host5"),
+				new TestLocatableInputSplit(10, "host5")
+		};
+		
+		try {
+			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
+			
+			assertEquals("host1", hostsForTasks[0]);
+			assertEquals("host1", hostsForTasks[1]);
+			assertEquals("host2", hostsForTasks[2]);
+			assertEquals("host2", hostsForTasks[3]);
+			assertEquals("host3", hostsForTasks[4]);
+			assertEquals("host3", hostsForTasks[5]);
+			assertEquals("host4", hostsForTasks[6]);
+			assertEquals("host4", hostsForTasks[7]);
+			assertEquals("host5", hostsForTasks[8]);
+			assertEquals("host5", hostsForTasks[9]);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNonEqualSplitsPerhost() {
+		int numHosts = 3;
+		int slotsPerHost = 2;
+		int parallelism = 5;
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, "host3"),
+				new TestLocatableInputSplit(2, "host1"),
+				new TestLocatableInputSplit(3, "host1"),
+				new TestLocatableInputSplit(4, "host1"),
+				new TestLocatableInputSplit(5, "host1"),
+				new TestLocatableInputSplit(6, "host1"),
+				new TestLocatableInputSplit(7, "host2"),
+				new TestLocatableInputSplit(8, "host2")
+		};
+		
+		try {
+			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
+			
+			assertEquals("host1", hostsForTasks[0]);
+			assertEquals("host1", hostsForTasks[1]);
+			assertEquals("host2", hostsForTasks[2]);
+			assertEquals("host2", hostsForTasks[3]);
+			assertEquals("host3", hostsForTasks[4]);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWithSubtasksEmpty() {
+		int numHosts = 3;
+		int slotsPerHost = 5;
+		int parallelism = 7;
+		
+		// host one gets three subtasks (but two remain empty)
+		// host two get two subtasks where one gets two splits, the other one split
+		// host three gets two subtasks where one gets five splits, the other gets four splits
+		
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, "host1"),
+				new TestLocatableInputSplit(2, "host2"),
+				new TestLocatableInputSplit(3, "host2"),
+				new TestLocatableInputSplit(4, "host2"),
+				new TestLocatableInputSplit(5, "host3"),
+				new TestLocatableInputSplit(6, "host3"),
+				new TestLocatableInputSplit(7, "host3"),
+				new TestLocatableInputSplit(8, "host3"),
+				new TestLocatableInputSplit(9, "host3"),
+				new TestLocatableInputSplit(10, "host3"),
+				new TestLocatableInputSplit(11, "host3"),
+				new TestLocatableInputSplit(12, "host3"),
+				new TestLocatableInputSplit(13, "host3")
+		};
+		
+		try {
+			String[] hostsForTasks = runTests(numHosts, slotsPerHost, parallelism, splits);
+			
+			assertEquals("host1", hostsForTasks[0]);
+			
+			assertEquals("host2", hostsForTasks[1]);
+			assertEquals("host2", hostsForTasks[2]);
+
+			assertEquals("host3", hostsForTasks[3]);
+			assertEquals("host3", hostsForTasks[4]);
+			
+			// the current assignment leaves those with empty constraints
+			assertTrue(hostsForTasks[5].equals("host1") || hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3"));
+			assertTrue(hostsForTasks[6].equals("host1") || hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultipleInstancesPerHost() {
+
+		TestLocatableInputSplit[] splits = new TestLocatableInputSplit[] {
+				new TestLocatableInputSplit(1, "host1"),
+				new TestLocatableInputSplit(2, "host1"),
+				new TestLocatableInputSplit(3, "host2"),
+				new TestLocatableInputSplit(4, "host2"),
+				new TestLocatableInputSplit(5, "host3"),
+				new TestLocatableInputSplit(6, "host3")
+		};
+		
+		try {
+			AbstractJobVertex vertex = new AbstractJobVertex("test vertex");
+			vertex.setParallelism(6);
+			vertex.setInvokableClass(DummyInvokable.class);
+			vertex.setInputSplitSource(new TestInputSplitSource(splits));
+			
+			JobGraph jobGraph = new JobGraph("test job", vertex);
+			
+			ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
+					jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+			
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			eg.setQueuedSchedulingAllowed(false);
+			
+			// create a scheduler with 6 instances where always two are on the same host
+			Scheduler scheduler = new Scheduler();
+			Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1);
+			Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1);
+			Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1);
+			Instance i4 = getInstance(new byte[] {10,0,1,2}, 12346, "host2", 1);
+			Instance i5 = getInstance(new byte[] {10,0,1,3}, 12345, "host3", 1);
+			Instance i6 = getInstance(new byte[] {10,0,1,3}, 12346, "host4", 1);
+			scheduler.newInstanceAvailable(i1);
+			scheduler.newInstanceAvailable(i2);
+			scheduler.newInstanceAvailable(i3);
+			scheduler.newInstanceAvailable(i4);
+			scheduler.newInstanceAvailable(i5);
+			scheduler.newInstanceAvailable(i6);
+			
+			eg.scheduleForExecution(scheduler);
+			
+			ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
+			assertEquals(6, tasks.length);
+			
+			Instance taskInstance1 = tasks[0].getCurrentAssignedResource().getInstance();
+			Instance taskInstance2 = tasks[1].getCurrentAssignedResource().getInstance();
+			Instance taskInstance3 = tasks[2].getCurrentAssignedResource().getInstance();
+			Instance taskInstance4 = tasks[3].getCurrentAssignedResource().getInstance();
+			Instance taskInstance5 = tasks[4].getCurrentAssignedResource().getInstance();
+			Instance taskInstance6 = tasks[5].getCurrentAssignedResource().getInstance();
+			
+			assertTrue (taskInstance1 == i1 || taskInstance1 == i2);
+			assertTrue (taskInstance2 == i1 || taskInstance2 == i2);
+			assertTrue (taskInstance3 == i3 || taskInstance3 == i4);
+			assertTrue (taskInstance4 == i3 || taskInstance4 == i4);
+			assertTrue (taskInstance5 == i5 || taskInstance5 == i6);
+			assertTrue (taskInstance6 == i5 || taskInstance6 == i6);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static String[] runTests(int numHosts, int slotsPerHost, int parallelism, 
+			TestLocatableInputSplit[] splits)
+		throws Exception
+	{
+		AbstractJobVertex vertex = new AbstractJobVertex("test vertex");
+		vertex.setParallelism(parallelism);
+		vertex.setInvokableClass(DummyInvokable.class);
+		vertex.setInputSplitSource(new TestInputSplitSource(splits));
+		
+		JobGraph jobGraph = new JobGraph("test job", vertex);
+		
+		ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
+				jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT);
+		eg.setQueuedSchedulingAllowed(false);
+		
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+		
+		Scheduler scheduler = getScheduler(numHosts, slotsPerHost);
+		eg.scheduleForExecution(scheduler);
+		
+		ExecutionVertex[] tasks = eg.getVerticesTopologically().iterator().next().getTaskVertices();
+		assertEquals(parallelism, tasks.length);
+		
+		String[] hostsForTasks = new String[parallelism];
+		for (int i = 0; i < parallelism; i++) {
+			hostsForTasks[i] = tasks[i].getCurrentAssignedResourceLocation().getHostname();
+		}
+		
+		return hostsForTasks;
+	}
+	
+	private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws
Exception {
+		Scheduler scheduler = new Scheduler();
+		
+		for (int i = 0; i < numInstances; i++) {
+			byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) };
+			int dataPort = 12001 + i;
+			String host = "host" + (i+1);
+			
+			Instance instance = getInstance(ipAddress, dataPort, host, numSlotsPerInstance);
+			scheduler.newInstanceAvailable(instance);
+		}
+		return scheduler;
+	}
+	
+	private static Instance getInstance(byte[] ipAddress, int dataPort, String hostname, int
slots) throws Exception {
+		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024,
1024*1024*1024, 512*1024*1024);
+		
+		InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
+		when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
+		when(connection.dataPort()).thenReturn(dataPort);
+		when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
+		when(connection.getHostname()).thenReturn(hostname);
+		when(connection.getFQDNHostname()).thenReturn(hostname);
+		
+		return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, slots);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	// custom class to ensure behavior works for subclasses of LocatableInputSplit
+	private static class TestLocatableInputSplit extends LocatableInputSplit {
+		
+		private static final long serialVersionUID = 1L;
+
+		public TestLocatableInputSplit(int splitNumber, String hostname) {
+			super(splitNumber, hostname);
+		}
+		
+		public TestLocatableInputSplit(int splitNumber, String[] hostnames) {
+			super(splitNumber, hostnames);
+		}
+	}
+	
+	private static class TestInputSplitSource implements InputSplitSource<TestLocatableInputSplit>,
+		StrictlyLocalAssignment
+	{
+		private static final long serialVersionUID = 1L;
+		
+		private final TestLocatableInputSplit[] splits;
+		
+		public TestInputSplitSource(TestLocatableInputSplit[] splits) {
+			this.splits = splits;
+		}
+
+		@Override
+		public TestLocatableInputSplit[] createInputSplits(int minNumSplits) {
+			return splits;
+		}
+		
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] inputSplits)
{
+			fail("This method should not be called on StrictlyLocalAssignment splits.");
+			return null; // silence the compiler
+		}
+	}
+}


Mime
View raw message