flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/22] Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Sun, 22 Jun 2014 21:47:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 2c70794..f425695 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -155,8 +155,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
 				new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
 		solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism());
-		solutionSetDeltaUpdateAux.setSubtasksPerInstance(getSubtasksPerInstance());
-		
+
 		PactConnection conn = new PactConnection(solutionSetDelta, solutionSetDeltaUpdateAux);
 		solutionSetDeltaUpdateAux.setIncomingConnection(conn);
 		solutionSetDelta.addOutgoingConnection(conn);
@@ -218,11 +217,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public boolean isMemoryConsumer() {
-		return true;
-	}
-	
-	@Override
 	protected List<OperatorDescriptorDual> getPossibleProperties() {
 		return new ArrayList<OperatorDescriptorDual>(1);
 	}
@@ -331,13 +325,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
 					// attach a no-op node through which we create the properties of the original input
 					Channel toNoOp = new Channel(candidate);
-					globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
+					globPropsReqWorkset.parameterizeChannel(toNoOp, false);
 					locPropsReqWorkset.parameterizeChannel(toNoOp);
 					
 					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
 					
 					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
-					rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
 					
 					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
 					rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
@@ -518,7 +511,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 			super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
 			
 			setDegreeOfParallelism(1);
-			setSubtasksPerInstance(1);
 		}
 		
 		public void setInputs(PactConnection input1, PactConnection input2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 574922a..e769508 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -53,8 +53,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	/**
 	 * Sets the partitioning property for the global properties.
 	 * 
-	 * @param partitioning The new partitioning to set.
-	 * @param partitionedFields 
+	 * @param partitionedFields
 	 */
 	public void setHashPartitioned(FieldSet partitionedFields) {
 		if (partitionedFields == null) {
@@ -218,7 +217,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * @param globalDopChange
 	 * @param localDopChange
 	 */
-	public void parameterizeChannel(Channel channel, boolean globalDopChange, boolean localDopChange) {
+	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
 		// if we request nothing, then we need no special strategy. forward, if the number of instances remains
 		// the same, randomly repartition otherwise
 		if (isTrivial()) {
@@ -228,8 +227,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 		
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
-		if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
-			// we meet already everything, so go forward
+		if (!globalDopChange && isMetBy(inGlobals)) {
 			channel.setShipStrategy(ShipStrategyType.FORWARD);
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index b389855..0ef277e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -48,8 +48,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-			combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
-			
+
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java
index be3ed74..867b9d9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java
@@ -48,8 +48,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-			combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
-			
+
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
index 980cf6d..ec45a53 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
@@ -85,9 +85,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-			combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
-			
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
+					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
index a28feeb..9fb97b5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
@@ -44,9 +44,9 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 		// create in input node for combine with same DOP as input node
 		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
 		combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-		combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
-		
-		return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+
+		return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
+				DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
index 4539da5..0db3fa5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
@@ -56,8 +56,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
-			combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
-			
+
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 8fe95c9..6f9418f 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -68,11 +68,11 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	private TempMode tempMode;
 	
-	private long tempMemory;
+	private double relativeTempMemory;
 	
-	private long memoryGlobalStrategy;
+	private double relativeMemoryGlobalStrategy;
 	
-	private long memoryLocalStrategy;
+	private double relativeMemoryLocalStrategy;
 	
 	private int replicationFactor = 1;
 	
@@ -200,17 +200,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	 *
 	 * @return The temp memory.
 	 */
-	public long getTempMemory() {
-		return this.tempMemory;
+	public double getRelativeTempMemory() {
+		return this.relativeTempMemory;
 	}
 	
 	/**
 	 * Sets the memory for materializing the channel's result from this Channel.
 	 *
-	 * @param tempMemory The memory for materialization.
+	 * @param relativeTempMemory The memory for materialization.
 	 */
-	public void setTempMemory(long tempMemory) {
-		this.tempMemory = tempMemory;
+	public void setRelativeTempMemory(double relativeTempMemory) {
+		this.relativeTempMemory = relativeTempMemory;
 	}
 	
 	/**
@@ -286,20 +286,20 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		this.localStrategyComparator = localStrategyComparator;
 	}
 	
-	public long getMemoryGlobalStrategy() {
-		return memoryGlobalStrategy;
+	public double getRelativeMemoryGlobalStrategy() {
+		return relativeMemoryGlobalStrategy;
 	}
 	
-	public void setMemoryGlobalStrategy(long memoryGlobalStrategy) {
-		this.memoryGlobalStrategy = memoryGlobalStrategy;
+	public void setRelativeMemoryGlobalStrategy(double relativeMemoryGlobalStrategy) {
+		this.relativeMemoryGlobalStrategy = relativeMemoryGlobalStrategy;
 	}
 	
-	public long getMemoryLocalStrategy() {
-		return memoryLocalStrategy;
+	public double getRelativeMemoryLocalStrategy() {
+		return relativeMemoryLocalStrategy;
 	}
 	
-	public void setMemoryLocalStrategy(long memoryLocalStrategy) {
-		this.memoryLocalStrategy = memoryLocalStrategy;
+	public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
+		this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
 	}
 	
 	public boolean isOnDynamicPath() {
@@ -437,33 +437,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		}
 		throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
 	}
-	
-	public void adjustGlobalPropertiesForLocalParallelismChange() {
-		if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
-			throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
-					"change before the ship strategy is set.");
-		}
-		
-		// make sure the properties are acquired
-		if (this.globalProps == null) {
-			getGlobalProperties();
-		}
-		
-		// some strategies globally reestablish properties
-		switch (this.shipStrategy) {
-		case FORWARD:
-			this.globalProps.reset();
-			return;
-		case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
-		case BROADCAST:
-		case PARTITION_HASH:
-		case PARTITION_RANGE:
-		case PARTITION_RANDOM:
-			return;
-		}
-		
-		throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
-	}
 
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 69263bc..539006c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -65,12 +65,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 
 	protected Costs cumulativeCosts;					// the cumulative costs of all operators in the sub-tree
 	
-	private long memoryPerSubTask;					// the amount of memory dedicated to each task, in bytes
+	private double relativeMemoryPerSubTask;					// the amount of memory dedicated to each task, in bytes
 	
 	private int degreeOfParallelism;
 	
-	private int subtasksPerInstance;
-	
 	private boolean pFlag;							// flag for the internal pruning algorithm
 	
 	// --------------------------------------------------------------------------------------------
@@ -83,8 +81,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		this.driverStrategy = strategy;
 		
 		this.degreeOfParallelism = template.getDegreeOfParallelism();
-		this.subtasksPerInstance = template.getSubtasksPerInstance();
-		
+
 		// check, if there is branch at this node. if yes, this candidate must be associated with
 		// the branching template node.
 		if (template.isBranching()) {
@@ -166,17 +163,17 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	 * 
 	 * @return The memory per task, in bytes.
 	 */
-	public long getMemoryPerSubTask() {
-		return this.memoryPerSubTask;
+	public double getRelativeMemoryPerSubTask() {
+		return this.relativeMemoryPerSubTask;
 	}
 
 	/**
 	 * Sets the memory dedicated to each task for this node.
 	 * 
-	 * @param memoryPerTask The memory per sub-task, in bytes.
+	 * @param relativeMemoryPerSubtask The relative memory per sub-task
 	 */
-	public void setMemoryPerSubTask(long memoryPerTask) {
-		this.memoryPerSubTask = memoryPerTask;
+	public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) {
+		this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
 	}
 	
 	/**
@@ -303,18 +300,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		this.degreeOfParallelism = parallelism;
 	}
 	
-	public void setSubtasksPerInstance(int subTasksPerInstance) {
-		this.subtasksPerInstance = subTasksPerInstance;
-	}
-	
 	public int getDegreeOfParallelism() {
 		return this.degreeOfParallelism;
 	}
 	
-	public int getSubtasksPerInstance() {
-		return this.subtasksPerInstance;
-	}
-	
 	public long getGuaranteedAvailableMemory() {
 		return this.template.getMinimalMemoryAcrossAllSubTasks();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index 82d757c..a1baff1 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -252,9 +252,6 @@ public class PlanJSONDumpGenerator {
 		writer.print(",\n\t\t\"parallelism\": \""
 			+ (n.getDegreeOfParallelism() >= 1 ? n.getDegreeOfParallelism() : "default") + "\"");
 		
-		writer.print(",\n\t\t\"subtasks_per_instance\": \""
-				+ (n.getSubtasksPerInstance() >= 1 ? n.getSubtasksPerInstance() : "default") + "\"");
-
 		// output node predecessors
 		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
 		String child1name = "", child2name = "";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 5a30fb6..b4c7560 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -22,9 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
 import eu.stratosphere.api.common.aggregators.AggregatorWithName;
 import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
@@ -101,7 +98,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	
 	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, true);
 	
-	private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
+//	private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
 	
 	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
 	
@@ -186,13 +183,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
 		}
 
-		// now that all have been created, make sure that all share their instances with the one
-		// with the highest degree of parallelism
-		if (program.getInstanceTypeName() != null) {
-			this.maxDegreeVertex.setInstanceType(program.getInstanceTypeName());
-		} else {
-			LOG.warn("No instance type assigned to JobVertex.");
-		}
 		for (AbstractJobVertex vertex : this.vertices.values()) {
 			if (vertex != this.maxDegreeVertex) {
 				vertex.setVertexToShareInstancesWith(this.maxDegreeVertex);
@@ -231,7 +221,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	 * @param node
 	 *        The node that is currently processed.
 	 * @return True, if the visitor should descend to the node's children, false if not.
-	 * @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
+	 * @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.util.Visitable)
 	 */
 	@Override
 	public boolean preVisit(PlanNode node) {
@@ -260,8 +250,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				// operator with the tail, if they have the same DOP. not merging is currently not
 				// implemented
 				PlanNode root = iterationNode.getRootOfStepFunction();
-				if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism() || 
-						root.getSubtasksPerInstance() != node.getSubtasksPerInstance()) 
+				if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism())
 				{
 					throw new CompilerException("Error: The final operator of the step " +
 							"function has a different degree of parallelism than the iteration operator itself.");
@@ -278,14 +267,12 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				PlanNode nextWorkSet = iterationNode.getNextWorkSetPlanNode();
 				PlanNode solutionSetDelta  = iterationNode.getSolutionSetDeltaPlanNode();
 				
-				if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism() || 
-					nextWorkSet.getSubtasksPerInstance() != node.getSubtasksPerInstance())
+				if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism())
 				{
 					throw new CompilerException("It is currently not supported that the final operator of the step " +
 							"function has a different degree of parallelism than the iteration operator itself.");
 				}
-				if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism() || 
-					solutionSetDelta.getSubtasksPerInstance() != node.getSubtasksPerInstance())
+				if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism())
 				{
 					throw new CompilerException("It is currently not supported that the final operator of the step " +
 							"function has a different degree of parallelism than the iteration operator itself.");
@@ -364,11 +351,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			if (this.maxDegreeVertex == null || this.maxDegreeVertex.getNumberOfSubtasks() < pd) {
 				this.maxDegreeVertex = vertex;
 			}
-	
-			// set the number of tasks per instance
-			if (node.getSubtasksPerInstance() >= 1) {
-				vertex.setNumberOfSubtasksPerInstance(node.getSubtasksPerInstance());
-			}
 			
 			// check whether this vertex is part of an iteration step function
 			if (this.currentIteration != null) {
@@ -377,10 +359,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				if (iterationNode.getDegreeOfParallelism() < pd) {
 					throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator.");
 				}
-				if (iterationNode.getSubtasksPerInstance() < node.getSubtasksPerInstance()) {
-					throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, number of subtasks-per-node than the iteration operator.");
-				}
-				
+
 				// store the id of the iterations the step functions participate in
 				IterationDescriptor descr = this.iterations.get(this.currentIteration);
 				new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
@@ -401,7 +380,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	 * 
 	 * @param node
 	 *        The node currently processed during the post-visit.
-	 * @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
+	 * @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.util.Visitable) t
 	 */
 	@Override
 	public void postVisit(PlanNode node) {
@@ -739,7 +718,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					inConn.getLocalStrategy() == LocalStrategy.NONE &&
 					pred.getOutgoingChannels().size() == 1 &&
 					node.getDegreeOfParallelism() == pred.getDegreeOfParallelism() && 
-					node.getSubtasksPerInstance() == pred.getSubtasksPerInstance() &&
 					node.getBroadcastInputs().isEmpty();
 			
 			// cannot chain the nodes that produce the next workset or the next solution set, if they are not the
@@ -879,7 +857,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					c.getLocalStrategy() == LocalStrategy.NONE &&
 					c.getTempMode() == TempMode.NONE &&
 					successor.getDegreeOfParallelism() == pspn.getDegreeOfParallelism() &&
-					successor.getSubtasksPerInstance() == pspn.getSubtasksPerInstance() &&
 					!(successor instanceof NAryUnionPlanNode) &&
 					successor != iteration.getRootOfStepFunction() &&
 					iteration.getInput().getLocalStrategy() == LocalStrategy.NONE;
@@ -948,7 +925,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					c.getLocalStrategy() == LocalStrategy.NONE &&
 					c.getTempMode() == TempMode.NONE &&
 					successor.getDegreeOfParallelism() == wspn.getDegreeOfParallelism() &&
-					successor.getSubtasksPerInstance() == wspn.getSubtasksPerInstance() &&
 					!(successor instanceof NAryUnionPlanNode) &&
 					successor != iteration.getNextWorkSetPlanNode() &&
 					iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
@@ -995,17 +971,17 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	}
 	
 	private void assignDriverResources(PlanNode node, TaskConfig config) {
-		final long mem = node.getMemoryPerSubTask();
-		if (mem > 0) {
-			config.setMemoryDriver(mem);
+		final double relativeMem = node.getRelativeMemoryPerSubTask();
+		if (relativeMem > 0) {
+			config.setRelativeMemoryDriver(relativeMem);
 			config.setFilehandlesDriver(this.defaultMaxFan);
 			config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
 		}
 	}
 	
 	private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
-		if (c.getMemoryLocalStrategy() > 0) {
-			config.setMemoryInput(inputNum, c.getMemoryLocalStrategy());
+		if (c.getRelativeMemoryLocalStrategy() > 0) {
+			config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
 			config.setFilehandlesInput(inputNum, this.defaultMaxFan);
 			config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
 		}
@@ -1020,13 +996,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	 * channel is then the channel into the union node, the local strategy channel the one from the union to the
 	 * actual target operator.
 	 *
-	 * @param channelForGlobalStrategy
-	 * @param channelForLocalStrategy
+	 * @param channel
 	 * @param inputNumber
 	 * @param sourceVertex
 	 * @param sourceConfig
 	 * @param targetVertex
 	 * @param targetConfig
+	 * @param isBroadcast
 	 * @throws JobGraphDefinitionException
 	 * @throws CompilerException
 	 */
@@ -1133,10 +1109,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			
 			if (needsMemory) {
 				// sanity check
-				if (tm == null || tm == TempMode.NONE || channel.getTempMemory() < 1) {
+				if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
 					throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
 				}
-				config.setInputMaterializationMemory(inputNum, channel.getTempMemory());
+				config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
 			}
 		}
 	}
@@ -1153,11 +1129,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
 		headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
 		headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
-		final long memForBackChannel = bulkNode.getMemoryPerSubTask();
-		if (memForBackChannel <= 0) {
+		final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask();
+		if (relativeMemForBackChannel <= 0) {
 			throw new CompilerException("Bug: No memory has been assigned to the iteration back channel.");
 		}
-		headConfig.setBackChannelMemory(memForBackChannel);
+		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
 		
 		// --------------------------- create the sync task ---------------------------
 		final JobOutputVertex sync = new JobOutputVertex("Sync(" +
@@ -1219,7 +1195,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
 			fakeTail.setOutputClass(FakeOutputTask.class);
 			fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
-			fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
 			this.auxVertices.add(fakeTail);
 			
 			// connect the fake tail
@@ -1262,7 +1237,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
 			fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
 			fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
-			fakeTailTerminationCriterion.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
 			this.auxVertices.add(fakeTailTerminationCriterion);
 		
 			// connect the fake tail
@@ -1310,14 +1284,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
 			headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
 			headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
-			final long mem = iterNode.getMemoryPerSubTask();
-			if (mem <= 0) {
+			final double relativeMemory = iterNode.getRelativeMemoryPerSubTask();
+			if (relativeMemory <= 0) {
 				throw new CompilerException("Bug: No memory has been assigned to the workset iteration.");
 			}
 			
 			headConfig.setIsWorksetIteration();
-			headConfig.setBackChannelMemory(mem / 2);
-			headConfig.setSolutionSetMemory(mem / 2);
+			headConfig.setRelativeBackChannelMemory(relativeMemory / 2);
+			headConfig.setRelativeSolutionSetMemory(relativeMemory / 2);
 			
 			// set the solution set serializer and comparator
 			headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
@@ -1396,7 +1370,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
 					fakeTail.setOutputClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
-					fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
 					this.auxVertices.add(fakeTail);
 					
 					// connect the fake tail
@@ -1435,7 +1408,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
 					fakeTail.setOutputClass(FakeOutputTask.class);
 					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
-					fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
 					this.auxVertices.add(fakeTail);
 					
 					// connect the fake tail
@@ -1502,9 +1474,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		
 		private AbstractJobVertex containingVertex;
 
-		@SuppressWarnings("unchecked")
-		TaskInChain(@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> chainedTask, TaskConfig taskConfig, String taskName) {
-			this.chainedTask = (Class<? extends ChainedDriver<?, ?>>) chainedTask;
+		TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
+					String taskName) {
+			this.chainedTask = chainedTask;
 			this.taskConfig = taskConfig;
 			this.taskName = taskName;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java
index e0da85d..f534ad9 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java
@@ -12,7 +12,6 @@
  **********************************************************************************************************************/
 package eu.stratosphere.pact.compiler;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,12 +36,6 @@ import eu.stratosphere.compiler.costs.DefaultCostEstimator;
 import eu.stratosphere.compiler.plan.OptimizedPlan;
 import eu.stratosphere.compiler.plan.PlanNode;
 import eu.stratosphere.compiler.plan.SingleInputPlanNode;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
 import eu.stratosphere.util.LogUtils;
 import eu.stratosphere.util.OperatingSystem;
 import eu.stratosphere.util.Visitor;
@@ -72,8 +65,6 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	protected transient PactCompiler noStatsCompiler;
 	
-	protected transient InstanceTypeDescription instanceType;
-	
 	private transient int statCounter;
 	
 	// ------------------------------------------------------------------------	
@@ -85,29 +76,22 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	@Before
 	public void setup() {
-		InetSocketAddress dummyAddr = new InetSocketAddress("localhost", 12345);
-		
 		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator(), dummyAddr);
+		this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
 		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
 		
-		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator(), dummyAddr);
+		this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
 		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-		
-		// create the instance type description
-		InstanceType iType = InstanceTypeFactory.construct("standard", 6, 2, 4096, 100, 0);
-		HardwareDescription hDesc = HardwareDescriptionFactory.construct(2, 4096 * 1024 * 1024, 2000 * 1024 * 1024);
-		this.instanceType = InstanceTypeDescriptionFactory.construct(iType, hDesc, DEFAULT_PARALLELISM * 2);
 	}
 	
 	// ------------------------------------------------------------------------
 	
 	public OptimizedPlan compileWithStats(Plan p) {
-		return this.withStatsCompiler.compile(p, this.instanceType);
+		return this.withStatsCompiler.compile(p);
 	}
 	
 	public OptimizedPlan compileNoStats(Plan p) {
-		return this.noStatsCompiler.compile(p, this.instanceType);
+		return this.noStatsCompiler.compile(p);
 	}
 	
 	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index 0e4177e..eff48cc 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -99,6 +99,11 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
 
 	/**
+	 * The config parameter defining the number of task slots of a task manager.
+	 */
+	public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots";
+
+	/**
 	 * The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager
 	 * for the ServerBootstrap.)
 	 */
@@ -290,12 +295,7 @@ public final class ConfigConstants {
 	/**
 	 * The default degree of parallelism for operations.
 	 */
-	public static final int DEFAULT_PARALLELIZATION_DEGREE = -1;
-
-	/**
-	 * The default intra-node parallelism.
-	 */
-	public static final int DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE = -1;
+	public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
 	
 	// ------------------------------ Runtime ---------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java b/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java
index 96be666..f79dd2a 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java
@@ -40,6 +40,7 @@ public final class ClassUtils {
 			throws ClassNotFoundException {
 
 		if (!className.contains("Protocol")) {
+			System.out.println(className);
 			throw new ClassNotFoundException("Only use this method for protocols!");
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
index f01e62d..c86c12b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
@@ -38,11 +38,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 	private String instanceName;
 
 	/**
-	 * The type of the instance the vertex is now assigned to.
-	 */
-	private String instanceType;
-
-	/**
 	 * Constructs a new event.
 	 * 
 	 * @param timestamp
@@ -51,16 +46,13 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 	 *        identifies the vertex this event refers to
 	 * @param instanceName
 	 *        the name of the instance the vertex is now assigned to
-	 * @param instanceType
-	 *        the type of the instance the vertex is now assigned to
 	 */
 	public VertexAssignmentEvent(final long timestamp, final ManagementVertexID managementVertexID,
-			final String instanceName, final String instanceType) {
+			final String instanceName) {
 		super(timestamp);
 
 		this.managementVertexID = managementVertexID;
 		this.instanceName = instanceName;
-		this.instanceType = instanceType;
 	}
 
 	/**
@@ -90,16 +82,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 		return this.instanceName;
 	}
 
-	/**
-	 * Returns the type of the instance the vertex is now assigned to.
-	 * 
-	 * @return the type of the instance the vertex is now assigned to
-	 */
-	public String getInstanceType() {
-		return this.instanceType;
-	}
-
-
 	@Override
 	public void read(final DataInput in) throws IOException {
 
@@ -107,7 +89,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 
 		this.managementVertexID.read(in);
 		this.instanceName = StringRecord.readString(in);
-		this.instanceType = StringRecord.readString(in);
 	}
 
 
@@ -118,7 +99,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 
 		this.managementVertexID.write(out);
 		StringRecord.writeString(out, this.instanceName);
-		StringRecord.writeString(out, this.instanceType);
 	}
 
 
@@ -149,16 +129,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 			}
 		}
 
-		if (this.instanceType == null) {
-			if (vae.getInstanceType() != null) {
-				return false;
-			}
-		} else {
-			if (!this.instanceType.equals(vae.getInstanceType())) {
-				return false;
-			}
-		}
-
 		return true;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
index 0106361..920c47e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
@@ -19,7 +19,6 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
 /**
  * Objects of this class represent a pair of {@link eu.stratosphere.runtime.io.serialization.io.channels.InputChannel} and {@link AbstractOutputChannel} objects
  * within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs.
- * 
  */
 public final class ExecutionEdge {
 
@@ -51,42 +50,34 @@ public final class ExecutionEdge {
 	}
 
 	public ExecutionGate getInputGate() {
-
 		return this.inputGate;
 	}
 
 	public ExecutionGate getOutputGate() {
-
 		return this.outputGate;
 	}
 
 	public ChannelID getOutputChannelID() {
-
 		return this.outputChannelID;
 	}
 
 	public ChannelID getInputChannelID() {
-
 		return this.inputChannelID;
 	}
 
 	public int getOutputGateIndex() {
-
 		return this.outputGateIndex;
 	}
 
 	public int getInputGateIndex() {
-
 		return this.inputGateIndex;
 	}
 	
 	public ChannelType getChannelType() {
-		
 		return this.groupEdge.getChannelType();
 	}
 	
 	public int getConnectionID() {
-		
 		return this.groupEdge.getConnectionID();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index ca7eddb..c5059f9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -38,8 +38,6 @@ import eu.stratosphere.nephele.execution.ExecutionListener;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 import eu.stratosphere.runtime.io.gates.GateID;
 import eu.stratosphere.runtime.io.channels.ChannelID;
@@ -160,18 +158,18 @@ public class ExecutionGraph implements ExecutionListener {
 	 * 
 	 * @param job
 	 *        the user's job graph
-	 * @param instanceManager
-	 *        the instance manager
+	 * @param defaultParallelism
+	 *        defaultParallelism in case that nodes have no parallelism set
 	 * @throws GraphConversionException
 	 *         thrown if the job graph is not valid and no execution graph can be constructed from it
 	 */
-	public ExecutionGraph(final JobGraph job, final InstanceManager instanceManager)
+	public ExecutionGraph(final JobGraph job, final int defaultParallelism)
 																					throws GraphConversionException {
 		this(job.getJobID(), job.getName(), job.getJobConfiguration());
 
 		// Start constructing the new execution graph from given job graph
 		try {
-			constructExecutionGraph(job, instanceManager);
+			constructExecutionGraph(job, defaultParallelism);
 		} catch (GraphConversionException e) {
 			throw e; // forward graph conversion exceptions
 		} catch (Exception e) {
@@ -217,7 +215,6 @@ public class ExecutionGraph implements ExecutionListener {
 			final ExecutionGroupVertex groupVertex = it2.next();
 			if (groupVertex.isNumberOfMembersUserDefined()) {
 				groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers());
-				groupVertex.repairSubtasksPerInstance();
 			}
 		}
 
@@ -253,12 +250,12 @@ public class ExecutionGraph implements ExecutionListener {
 	 * 
 	 * @param jobGraph
 	 *        the job graph to create the execution graph from
-	 * @param instanceManager
-	 *        the instance manager
+	 * @param defaultParallelism
+	 *        defaultParallelism in case that nodes have no parallelism set
 	 * @throws GraphConversionException
 	 *         thrown if the job graph is not valid and no execution graph can be constructed from it
 	 */
-	private void constructExecutionGraph(final JobGraph jobGraph, final InstanceManager instanceManager)
+	private void constructExecutionGraph(final JobGraph jobGraph, final int defaultParallelism)
 			throws GraphConversionException {
 
 		// Clean up temporary data structures
@@ -272,8 +269,11 @@ public class ExecutionGraph implements ExecutionListener {
 		// Convert job vertices to execution vertices and initialize them
 		final AbstractJobVertex[] all = jobGraph.getAllJobVertices();
 		for (int i = 0; i < all.length; i++) {
-			final ExecutionVertex createdVertex = createVertex(all[i], instanceManager, initialExecutionStage,
-				jobGraph.getJobConfiguration());
+			if(all[i].getNumberOfSubtasks() == -1){
+				all[i].setNumberOfSubtasks(defaultParallelism);
+			}
+
+			final ExecutionVertex createdVertex = createVertex(all[i], initialExecutionStage);
 			temporaryVertexMap.put(all[i], createdVertex);
 			temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex());
 		}
@@ -444,37 +444,15 @@ public class ExecutionGraph implements ExecutionListener {
 	 * 
 	 * @param jobVertex
 	 *        the job vertex to create the execution vertex from
-	 * @param instanceManager
-	 *        the instanceManager
 	 * @param initialExecutionStage
 	 *        the initial execution stage all group vertices are added to
-	 * @param jobConfiguration
-	 *        the configuration object originally attached to the {@link JobGraph}
 	 * @return the new execution vertex
 	 * @throws GraphConversionException
 	 *         thrown if the job vertex is of an unknown subclass
 	 */
-	private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final InstanceManager instanceManager,
-			final ExecutionStage initialExecutionStage, final Configuration jobConfiguration)
+	private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final ExecutionStage initialExecutionStage)
 			throws GraphConversionException {
 
-		// If the user has requested instance type, check if the type is known by the current instance manager
-		InstanceType instanceType = null;
-		boolean userDefinedInstanceType = false;
-		if (jobVertex.getInstanceType() != null) {
-
-			userDefinedInstanceType = true;
-			instanceType = instanceManager.getInstanceTypeByName(jobVertex.getInstanceType());
-			if (instanceType == null) {
-				throw new GraphConversionException("Requested instance type " + jobVertex.getInstanceType()
-					+ " is not known to the instance manager");
-			}
-		}
-
-		if (instanceType == null) {
-			instanceType = instanceManager.getDefaultInstanceType();
-		}
-
 		// Create an initial execution vertex for the job vertex
 		final Class<? extends AbstractInvokable> invokableClass = jobVertex.getInvokableClass();
 		if (invokableClass == null) {
@@ -491,8 +469,7 @@ public class ExecutionGraph implements ExecutionListener {
 		ExecutionGroupVertex groupVertex = null;
 		try {
 			groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this,
-				jobVertex.getNumberOfSubtasks(), instanceType, userDefinedInstanceType,
-				jobVertex.getNumberOfSubtasksPerInstance(), jobVertex.getVertexToShareInstancesWith() != null ? true
+				jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null ? true
 					: false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature,
 				invokableClass);
 		} catch (Throwable t) {
@@ -506,39 +483,6 @@ public class ExecutionGraph implements ExecutionListener {
 			throw new GraphConversionException(StringUtils.stringifyException(e));
 		}
 
-		// Check if the user's specifications for the number of subtasks are valid
-		final int minimumNumberOfSubtasks = jobVertex.getMinimumNumberOfSubtasks(groupVertex.getEnvironment()
-			.getInvokable());
-		final int maximumNumberOfSubtasks = jobVertex.getMaximumNumberOfSubtasks(groupVertex.getEnvironment()
-			.getInvokable());
-		if (jobVertex.getNumberOfSubtasks() != -1) {
-			if (jobVertex.getNumberOfSubtasks() < 1) {
-				throw new GraphConversionException("Cannot split task " + jobVertex.getName() + " into "
-					+ jobVertex.getNumberOfSubtasks() + " subtasks");
-			}
-
-			if (jobVertex.getNumberOfSubtasks() < minimumNumberOfSubtasks) {
-				throw new GraphConversionException("Number of subtasks must be at least " + minimumNumberOfSubtasks);
-			}
-
-			if (maximumNumberOfSubtasks != -1) {
-				if (jobVertex.getNumberOfSubtasks() > maximumNumberOfSubtasks) {
-					throw new GraphConversionException("Number of subtasks for vertex " + jobVertex.getName()
-						+ " can be at most " + maximumNumberOfSubtasks);
-				}
-			}
-		}
-
-		// Check number of subtasks per instance
-		if (jobVertex.getNumberOfSubtasksPerInstance() != -1 && jobVertex.getNumberOfSubtasksPerInstance() < 1) {
-			throw new GraphConversionException("Cannot set number of subtasks per instance to "
-				+ jobVertex.getNumberOfSubtasksPerInstance() + " for vertex " + jobVertex.getName());
-		}
-
-		// Assign min/max to the group vertex (settings are actually applied in applyUserDefinedSettings)
-		groupVertex.setMinMemberSize(minimumNumberOfSubtasks);
-		groupVertex.setMaxMemberSize(maximumNumberOfSubtasks);
-
 		// Register input and output vertices separately
 		if (jobVertex instanceof AbstractJobInputVertex) {
 
@@ -579,8 +523,7 @@ public class ExecutionGraph implements ExecutionListener {
 			jobVertex.getNumberOfBackwardConnections());
 
 		// Assign initial instance to vertex (may be overwritten later on when user settings are applied)
-		ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(instanceType), instanceType,
-			null));
+		ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null));
 
 		return ev;
 	}
@@ -853,6 +796,48 @@ public class ExecutionGraph implements ExecutionListener {
 	}
 
 	/**
+	 * Retrieves the maximum parallel degree of the job represented by this execution graph
+	 */
+	public int getMaxNumberSubtasks() {
+		int maxDegree = 0;
+		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
+
+		while(stageIterator.hasNext()){
+			final ExecutionStage stage = stageIterator.next();
+
+			int maxPerStageDegree = stage.getMaxNumberSubtasks();
+
+			if(maxPerStageDegree > maxDegree){
+				maxDegree = maxPerStageDegree;
+			}
+		}
+
+		return maxDegree;
+	}
+
+	/**
+	 * Retrieves the number of required slots to run this execution graph
+	 * @return
+	 */
+	public int getRequiredSlots(){
+		int maxRequiredSlots = 0;
+
+		final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
+
+		while(stageIterator.hasNext()){
+			final ExecutionStage stage = stageIterator.next();
+
+			int requiredSlots = stage.getRequiredSlots();
+
+			if(requiredSlots > maxRequiredSlots){
+				maxRequiredSlots = requiredSlots;
+			}
+		}
+
+		return maxRequiredSlots;
+	}
+
+	/**
 	 * Returns the stage which is currently executed.
 	 * 
 	 * @return the currently executed stage or <code>null</code> if the job execution is already completed
@@ -1318,25 +1303,16 @@ public class ExecutionGraph implements ExecutionListener {
 		return this.jobName;
 	}
 
-
 	@Override
-	public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// TODO Auto-generated method stub
-
-	}
-
+	public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
 
 	@Override
-	public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
-		// TODO Auto-generated method stub
-
-	}
+	public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
 
 	/**
 	 * Reconstructs the execution pipelines for the entire execution graph.
 	 */
 	private void reconstructExecutionPipelines() {
-
 		final Iterator<ExecutionStage> it = this.stages.iterator();
 		while (it.hasNext()) {
 
@@ -1345,39 +1321,17 @@ public class ExecutionGraph implements ExecutionListener {
 	}
 
 	/**
-	 * Calculates the connection IDs of the graph to avoid deadlocks in the data flow at runtime.
-	 */
-	private void calculateConnectionIDs() {
-
-		final Set<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
-		final ExecutionStage lastStage = getStage(getNumberOfStages() - 1);
-
-		for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
-
-			final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
-			
-			int currentConnectionID = 0;
-			
-			if (groupVertex.isOutputVertex()) {
-			currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
-			}
-		}
-	}
-
-	/**
 	 * Returns an iterator over all execution stages contained in this graph.
 	 * 
 	 * @return an iterator over all execution stages contained in this graph
 	 */
 	public Iterator<ExecutionStage> iterator() {
-
 		return this.stages.iterator();
 	}
 
 
 	@Override
 	public int getPriority() {
-
 		return 1;
 	}
 
@@ -1388,7 +1342,22 @@ public class ExecutionGraph implements ExecutionListener {
 	 *        the update command to be asynchronously executed on this graph
 	 */
 	public void executeCommand(final Runnable command) {
-
 		this.executorService.execute(command);
 	}
+	
+	private void calculateConnectionIDs() {
+		final Set<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
+		final ExecutionStage lastStage = getStage(getNumberOfStages() - 1);
+
+		for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
+
+			final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
+
+			int currentConnectionID = 0;
+
+			if (groupVertex.isOutputVertex()) {
+			currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index 89b4b6d..c865609 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -18,7 +18,6 @@ import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.nephele.execution.RuntimeEnvironment;
 import eu.stratosphere.nephele.instance.AllocatedResource;
 import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.jobgraph.JobVertexID;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.runtime.io.channels.ChannelType;
@@ -69,41 +68,11 @@ public final class ExecutionGroupVertex {
 	private final CopyOnWriteArrayList<ExecutionVertex> groupMembers = new CopyOnWriteArrayList<ExecutionVertex>();
 
 	/**
-	 * Maximum number of execution vertices this group vertex can manage.
-	 */
-	private volatile int maxMemberSize = 1;
-
-	/**
-	 * Minimum number of execution vertices this group vertex can manage.
-	 */
-	private volatile int minMemberSize = 1;
-
-	/**
 	 * The user defined number of execution vertices, -1 if the user has not specified it.
 	 */
 	private final int userDefinedNumberOfMembers;
 
 	/**
-	 * The instance type to be used for execution vertices this group vertex manages.
-	 */
-	private volatile InstanceType instanceType = null;
-
-	/**
-	 * Stores whether the instance type is user defined.
-	 */
-	private final boolean userDefinedInstanceType;
-
-	/**
-	 * Stores the number of subtasks per instance.
-	 */
-	private volatile int numberOfSubtasksPerInstance = -1;
-
-	/**
-	 * Stores whether the number of subtasks per instance is user defined.
-	 */
-	private final boolean userDefinedNumberOfSubtasksPerInstance;
-
-	/**
 	 * Number of retries in case of an error before the task represented by this vertex is considered as failed.
 	 */
 	private final int numberOfExecutionRetries;
@@ -175,12 +144,6 @@ public final class ExecutionGroupVertex {
 	 *        the execution graph is group vertex belongs to
 	 * @param userDefinedNumberOfMembers
 	 *        the user defined number of subtasks, -1 if the user did not specify the number
-	 * @param instanceType
-	 *        the instance type to be used for execution vertices this group vertex manages.
-	 * @param userDefinedInstanceType
-	 *        <code>true</code> if the instance type is user defined, <code>false</code> otherwise
-	 * @param numberOfSubtasksPerInstance
-	 *        the user defined number of subtasks per instance, -1 if the user did not specify the number
 	 * @param userDefinedVertexToShareInstanceWith
 	 *        <code>true</code> if the user specified another vertex to share instances with, <code>false</code>
 	 *        otherwise
@@ -197,24 +160,13 @@ public final class ExecutionGroupVertex {
 	 *         throws if an error occurs while instantiating the {@link AbstractInvokable}
 	 */
 	public ExecutionGroupVertex(final String name, final JobVertexID jobVertexID, final ExecutionGraph executionGraph,
-			final int userDefinedNumberOfMembers, final InstanceType instanceType,
-			final boolean userDefinedInstanceType, final int numberOfSubtasksPerInstance,
-			final boolean userDefinedVertexToShareInstanceWith, final int numberOfExecutionRetries,
-			final Configuration configuration, final ExecutionSignature signature,
+			final int userDefinedNumberOfMembers, final boolean userDefinedVertexToShareInstanceWith,
+			final int numberOfExecutionRetries, final Configuration configuration, final ExecutionSignature signature,
 			final Class<? extends AbstractInvokable> invokableClass) throws Exception {
 
 		this.name = (name != null) ? name : "";
 		this.jobVertexID = jobVertexID;
 		this.userDefinedNumberOfMembers = userDefinedNumberOfMembers;
-		this.instanceType = instanceType;
-		this.userDefinedInstanceType = userDefinedInstanceType;
-		if (numberOfSubtasksPerInstance != -1) {
-			this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
-			this.userDefinedNumberOfSubtasksPerInstance = true;
-		} else {
-			this.numberOfSubtasksPerInstance = 1;
-			this.userDefinedNumberOfSubtasksPerInstance = false;
-		}
 		if (numberOfExecutionRetries >= 0) {
 			this.numberOfExecutionRetries = numberOfExecutionRetries;
 		} else {
@@ -309,32 +261,6 @@ public final class ExecutionGroupVertex {
 	}
 
 	/**
-	 * Sets the maximum number of members this group vertex can have.
-	 * 
-	 * @param maxSize
-	 *        the maximum number of members this group vertex can have
-	 */
-	void setMaxMemberSize(final int maxSize) {
-
-		// TODO: Add checks here
-
-		this.maxMemberSize = maxSize;
-	}
-
-	/**
-	 * Sets the minimum number of members this group vertex must have.
-	 * 
-	 * @param minSize
-	 *        the minimum number of members this group vertex must have
-	 */
-	void setMinMemberSize(final int minSize) {
-
-		// TODO: Add checks here
-
-		this.minMemberSize = minSize;
-	}
-
-	/**
 	 * Returns the current number of members this group vertex has.
 	 * 
 	 * @return the current number of members this group vertex has
@@ -345,24 +271,6 @@ public final class ExecutionGroupVertex {
 	}
 
 	/**
-	 * Returns the maximum number of members this group vertex can have.
-	 * 
-	 * @return the maximum number of members this group vertex can have
-	 */
-	public int getMaximumNumberOfGroupMembers() {
-		return this.maxMemberSize;
-	}
-
-	/**
-	 * Returns the minimum number of members this group vertex must have.
-	 * 
-	 * @return the minimum number of members this group vertex must have
-	 */
-	public int getMinimumNumberOfGroupMember() {
-		return this.minMemberSize;
-	}
-
-	/**
 	 * Wires this group vertex to the specified group vertex and creates
 	 * a back link.
 	 * 
@@ -376,10 +284,6 @@ public final class ExecutionGroupVertex {
 	 *        the channel type to be used for this edge
 	 * @param userDefinedChannelType
 	 *        <code>true</code> if the channel type is user defined, <code>false</code> otherwise
-	 * @param compressionLevel
-	 *        the compression level to be used for this edge
-	 * @param userDefinedCompressionLevel
-	 *        <code>true</code> if the compression level is user defined, <code>false</code> otherwise
 	 * @param distributionPattern
 	 *        the distribution pattern to create the wiring between the group members
 	 * @param isBroadcast
@@ -480,10 +384,10 @@ public final class ExecutionGroupVertex {
 	 * @throws GraphConversionException
 	 *         thrown if the number of execution vertices for this group vertex cannot be set to the desired value
 	 */
-	void createInitialExecutionVertices(final int initalNumberOfVertices) throws GraphConversionException {
+	void createInitialExecutionVertices(final int initialNumberOfVertices) throws GraphConversionException {
 
 		// If the requested number of group vertices does not change, do nothing
-		if (initalNumberOfVertices == this.getCurrentNumberOfGroupMembers()) {
+		if (initialNumberOfVertices == this.getCurrentNumberOfGroupMembers()) {
 			return;
 		}
 
@@ -517,25 +421,14 @@ public final class ExecutionGroupVertex {
 		 * }
 		 */
 
-		if (initalNumberOfVertices < this.getMinimumNumberOfGroupMember()) {
-			throw new GraphConversionException("Number of members must be at least "
-				+ this.getMinimumNumberOfGroupMember());
-		}
-
-		if ((this.getMaximumNumberOfGroupMembers() != -1)
-			&& (initalNumberOfVertices > this.getMaximumNumberOfGroupMembers())) {
-			throw new GraphConversionException("Number of members cannot exceed "
-				+ this.getMaximumNumberOfGroupMembers());
-		}
-
 		final ExecutionVertex originalVertex = this.getGroupMember(0);
 		int currentNumberOfExecutionVertices = this.getCurrentNumberOfGroupMembers();
 
-		while (currentNumberOfExecutionVertices++ < initalNumberOfVertices) {
+		while (currentNumberOfExecutionVertices++ < initialNumberOfVertices) {
 
 			final ExecutionVertex vertex = originalVertex.splitVertex();
 			vertex.setAllocatedResource(new AllocatedResource(DummyInstance
-				.createDummyInstance(this.instanceType), this.instanceType, null));
+				.createDummyInstance(), null));
 			this.groupMembers.add(vertex);
 		}
 
@@ -645,53 +538,6 @@ public final class ExecutionGroupVertex {
 		return this.userDefinedNumberOfMembers;
 	}
 
-	boolean isInstanceTypeUserDefined() {
-
-		return this.userDefinedInstanceType;
-	}
-
-	void setInstanceType(final InstanceType instanceType) throws GraphConversionException {
-
-		if (instanceType == null) {
-			throw new IllegalArgumentException("Argument instanceType must not be null");
-		}
-
-		if (this.userDefinedInstanceType) {
-			throw new GraphConversionException("Cannot overwrite user defined instance type "
-				+ instanceType.getIdentifier());
-		}
-
-		this.instanceType = instanceType;
-
-		// Reset instance allocation of all members and let reassignInstances do the work
-		for (int i = 0; i < this.groupMembers.size(); i++) {
-			final ExecutionVertex vertex = this.groupMembers.get(i);
-			vertex.setAllocatedResource(null);
-		}
-	}
-
-	InstanceType getInstanceType() {
-		return this.instanceType;
-	}
-
-	boolean isNumberOfSubtasksPerInstanceUserDefined() {
-
-		return this.userDefinedNumberOfSubtasksPerInstance;
-	}
-
-	void setNumberOfSubtasksPerInstance(final int numberOfSubtasksPerInstance) throws GraphConversionException {
-
-		if (this.userDefinedNumberOfSubtasksPerInstance
-			&& (numberOfSubtasksPerInstance != this.numberOfSubtasksPerInstance)) {
-			throw new GraphConversionException("Cannot overwrite user defined number of subtasks per instance");
-		}
-
-		this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
-	}
-
-	int getNumberOfSubtasksPerInstance() {
-		return this.numberOfSubtasksPerInstance;
-	}
 
 	/**
 	 * Returns the number of retries in case of an error before the task represented by this vertex is considered as
@@ -766,27 +612,13 @@ public final class ExecutionGroupVertex {
 
 	}
 
-	void repairSubtasksPerInstance() {
-
-		final Iterator<ExecutionVertex> it = this.groupMembers.iterator();
-		int count = 0;
-		while (it.hasNext()) {
-
-			final ExecutionVertex v = it.next();
-			v.setAllocatedResource(this.groupMembers.get(
-				(count++ / this.numberOfSubtasksPerInstance) * this.numberOfSubtasksPerInstance)
-				.getAllocatedResource());
-		}
-	}
-
 	void repairInstanceSharing(final Set<AllocatedResource> availableResources) {
 
 		// Number of required resources by this group vertex
-		final int numberOfRequiredInstances = (this.groupMembers.size() / this.numberOfSubtasksPerInstance)
-			+ (((this.groupMembers.size() % this.numberOfSubtasksPerInstance) != 0) ? 1 : 0);
+		final int numberOfRequiredSlots = this.groupMembers.size();
 
 		// Number of resources to be replaced
-		final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredInstances);
+		final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredSlots);
 
 		// Build the replacement map if necessary
 		final Map<AllocatedResource, AllocatedResource> replacementMap = new HashMap<AllocatedResource, AllocatedResource>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
index eab2375..df29aef 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
@@ -15,18 +15,10 @@ package eu.stratosphere.nephele.executiongraph;
 
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 
 /**
@@ -35,16 +27,10 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
  * job can only start to execute if the execution of its preceding stage is complete.
  * <p>
  * This class is thread-safe.
- * 
  */
 public final class ExecutionStage {
 
 	/**
-	 * The log object used for debugging.
-	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionStage.class);
-
-	/**
 	 * The execution graph that this stage belongs to.
 	 */
 	private final ExecutionGraph executionGraph;
@@ -242,69 +228,6 @@ public final class ExecutionStage {
 	}
 
 	/**
-	 * Checks which instance types and how many instances of these types are required to execute this stage
-	 * of the job graph. The required instance types and the number of instances are collected in the given map. Note
-	 * that this method does not clear the map before collecting the instances.
-	 * 
-	 * @param instanceRequestMap
-	 *        the map containing the instances types and the required number of instances of the respective type
-	 * @param executionState
-	 *        the execution state the considered vertices must be in
-	 */
-	public void collectRequiredInstanceTypes(final InstanceRequestMap instanceRequestMap,
-			final ExecutionState executionState) {
-
-		final Set<AbstractInstance> collectedInstances = new HashSet<AbstractInstance>();
-		final ExecutionGroupVertexIterator groupIt = new ExecutionGroupVertexIterator(this.getExecutionGraph(), true,
-			this.stageNum);
-
-		while (groupIt.hasNext()) {
-
-			final ExecutionGroupVertex groupVertex = groupIt.next();
-			final Iterator<ExecutionVertex> vertexIt = groupVertex.iterator();
-			while (vertexIt.hasNext()) {
-
-				// Get the instance type from the execution vertex if it
-				final ExecutionVertex vertex = vertexIt.next();
-				if (vertex.getExecutionState() == executionState) {
-					final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
-
-					if (collectedInstances.contains(instance)) {
-						continue;
-					} else {
-						collectedInstances.add(instance);
-					}
-
-					if (instance instanceof DummyInstance) {
-
-						final InstanceType instanceType = instance.getType();
-						int num = instanceRequestMap.getMaximumNumberOfInstances(instanceType);
-						++num;
-						instanceRequestMap.setMaximumNumberOfInstances(instanceType, num);
-						if (groupVertex.isInputVertex()) {
-							num = instanceRequestMap.getMinimumNumberOfInstances(instanceType);
-							++num;
-							instanceRequestMap.setMinimumNumberOfInstances(instanceType, num);
-						}
-					} else {
-						LOG.debug("Execution Vertex " + vertex.getName() + " (" + vertex.getID()
-							+ ") is already assigned to non-dummy instance, skipping...");
-					}
-				}
-			}
-		}
-
-		final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMaximumIterator();
-		while (it.hasNext()) {
-
-			final Map.Entry<InstanceType, Integer> entry = it.next();
-			if (instanceRequestMap.getMinimumNumberOfInstances(entry.getKey()) == 0) {
-				instanceRequestMap.setMinimumNumberOfInstances(entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-	/**
 	 * Returns the execution graph that this stage belongs to.
 	 * 
 	 * @return the execution graph that this stage belongs to
@@ -446,4 +369,37 @@ public final class ExecutionStage {
 			}
 		}
 	}
+
+	public int getMaxNumberSubtasks(){
+		int maxDegree = 0;
+
+		for(int i =0; i < this.getNumberOfStageMembers(); i++){
+			final ExecutionGroupVertex groupVertex = this.getStageMember(i);
+
+			if(groupVertex.getCurrentNumberOfGroupMembers() > maxDegree){
+				maxDegree = groupVertex.getCurrentNumberOfGroupMembers();
+			}
+		}
+
+		return maxDegree;
+	}
+
+	public int getRequiredSlots(){
+		Set<Instance> instanceSet = new HashSet<Instance>();
+
+		for(int i=0; i< this.getNumberOfStageMembers(); i++){
+			final ExecutionGroupVertex groupVertex = this.getStageMember(i);
+
+			final Iterator<ExecutionVertex> vertexIterator = groupVertex.iterator();
+
+			while(vertexIterator.hasNext()){
+				final ExecutionVertex vertex = vertexIterator.next();
+
+				instanceSet.add(vertex.getAllocatedResource().getInstance());
+			}
+
+		}
+
+		return instanceSet.size();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
index 8e9395a..1e8d538 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
@@ -855,7 +855,6 @@ public final class ExecutionVertex {
 	 *         <code>false/<code> otherwise
 	 */
 	public boolean decrementRetriesLeftAndCheck() {
-
 		return (this.retriesLeft.decrementAndGet() > 0);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java
index 3a41aa2..8565495 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java
@@ -74,6 +74,7 @@ public enum InternalJobStatus {
 	 *        the internal job status to converted.
 	 * @return the corresponding job status or <code>null</code> if no corresponding job status exists
 	 */
+	@SuppressWarnings("incomplete-switch")
 	public static JobStatus toJobStatus(InternalJobStatus status) {
 
 		switch (status) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
index 72e3651..04c68b1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
@@ -17,8 +17,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import eu.stratosphere.nephele.instance.AbstractInstance;
 import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.instance.Instance;
 import eu.stratosphere.nephele.managementgraph.ManagementEdge;
 import eu.stratosphere.nephele.managementgraph.ManagementEdgeID;
 import eu.stratosphere.nephele.managementgraph.ManagementGate;
@@ -120,12 +120,11 @@ public class ManagementGraphFactory {
 			final ExecutionVertex ev = iterator.next();
 			final ManagementGroupVertex parent = groupMap.get(ev.getGroupVertex());
 
-			final AbstractInstance instance = ev.getAllocatedResource().getInstance();
+			final Instance instance = ev.getAllocatedResource().getInstance();
 			final ManagementVertex managementVertex = new ManagementVertex(
 						parent, 
 						ev.getID().toManagementVertexID(),
-						(instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(), 
-						instance.getType().toString(), 
+						(instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(),
 						ev.getIndexInVertexGroup()
 					);
 			managementVertex.setExecutionState(ev.getExecutionState());


Mime
View raw message