flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [05/22] Rework the Taskmanager to a slot based model and remove legacy cloud code
Date Sun, 22 Jun 2014 21:47:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildSecondHashMatchIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildSecondHashMatchIterator.java
index f1525d8..732d256 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildSecondHashMatchIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildSecondHashMatchIterator.java
@@ -60,7 +60,7 @@ public final class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIt
 			TypeSerializer<V1> serializer1, TypeComparator<V1> comparator1,
 			TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2,
 			TypePairComparator<V1, V2> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
 	throws MemoryAllocationException
 	{		
 		this.memManager = memManager;
@@ -73,7 +73,7 @@ public final class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIt
 		this.probeCopy = serializer1.createInstance();
 		
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
-			memManager, ioManager, ownerTask, totalMemory);
+			memManager, ioManager, ownerTask, memoryFraction);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -149,10 +149,10 @@ public final class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIt
 	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
 	throws MemoryAllocationException
 	{
-		final int numPages = memManager.computeNumberOfPages(totalMemory);
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
 		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/InMemoryPartition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/InMemoryPartition.java
index 5587ad2..9cc4f49 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/InMemoryPartition.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/InMemoryPartition.java
@@ -310,6 +310,7 @@ public class InMemoryPartition<T> {
 			return posInArray;
 		}
 		
+		@SuppressWarnings("unused")
 		public void setSegmentNumberOffset(int offset) {
 			this.segmentNumberOffset = offset;
 		}
@@ -364,6 +365,7 @@ public class InMemoryPartition<T> {
 			seekInput(this.segments.get(bufferNum), offset, this.segmentSizeMask + 1);
 		}
 		
+		@SuppressWarnings("unused")
 		public void setSegmentNumberOffset(int offset) {
 			this.segmentNumberOffset = offset;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
index 89571c4..b94e276 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
@@ -130,7 +130,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 	private BlockingBackChannel initBackChannel() throws Exception {
 
 		/* get the size of the memory available to the backchannel */
-		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getBackChannelMemory());
+		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory
+				());
 
 		/* allocate the memory available to the backchannel */
 		List<MemorySegment> segments = new ArrayList<MemorySegment>();
@@ -150,7 +151,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 	
 	private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
 		// get some memory
-		long hashjoinMemorySize = config.getSolutionSetMemory();
+		double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
 
 		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
 		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index a8a1293..ba38821 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -22,45 +22,42 @@ public enum ShipStrategyType {
 	/**
 	 * Constant used as an indicator for an unassigned ship strategy.
 	 */
-	NONE(false, false, false),
+	NONE(false, false),
 	
 	/**
-	 * Forwarding the data preserving all global properties.
+	 * Forwarding the data locally in memory.
 	 */
-	FORWARD(false, false, false),
+	FORWARD(false, false),
 	
 	/**
 	 * Repartitioning the data randomly, typically when the degree of parallelism between two nodes changes.
 	 */
-	PARTITION_RANDOM(true, true, false),
+	PARTITION_RANDOM(true, false),
 	
 	/**
 	 * Repartitioning the data deterministically through a hash function.
 	 */
-	PARTITION_HASH(true, true, true),
+	PARTITION_HASH(true, true),
 	
 	/**
 	 * Partitioning the data in ranges according to a total order.
 	 */
-	PARTITION_RANGE(true, true, true),
+	PARTITION_RANGE(true, true),
 	
 	/**
 	 * Replicating the data set to all instances.
 	 */
-	BROADCAST(true, true, false);
+	BROADCAST(true, false);
 	
 	// --------------------------------------------------------------------------------------------
 	
 	private final boolean isNetwork;
 	
-	private final boolean compensatesForLocalDOPChanges;
-	
 	private final boolean requiresComparator;
 	
 	
-	private ShipStrategyType(boolean network, boolean compensatesForLocalDOPChanges, boolean requiresComparator) {
+	private ShipStrategyType(boolean network, boolean requiresComparator) {
 		this.isNetwork = network;
-		this.compensatesForLocalDOPChanges = compensatesForLocalDOPChanges;
 		this.requiresComparator = requiresComparator;
 	}
 	
@@ -68,10 +65,6 @@ public enum ShipStrategyType {
 		return this.isNetwork;
 	}
 	
-	public boolean compensatesForLocalDOPChanges() {
-		return this.compensatesForLocalDOPChanges;
-	}
-	
 	public boolean requiresComparator() {
 		return this.requiresComparator;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorter.java
index 0cf6bb0..35377cf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorter.java
@@ -50,7 +50,7 @@ public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting.
+	 * @param memoryFraction The fraction of memory dedicated to sorting.
 	 * 
 	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
@@ -59,12 +59,13 @@ public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
 	public AsynchronousPartialSorter(MemoryManager memoryManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory)
+			double memoryFraction)
 	throws IOException, MemoryAllocationException
 	{
-		super(memoryManager, null, input, parentTask, serializerFactory, comparator, totalMemory,
-			totalMemory < 2 * MIN_NUM_SORT_MEM_SEGMENTS * memoryManager.getPageSize() ? 1 : 
-				Math.max((int) Math.ceil(((double) totalMemory) / MAX_MEM_PER_PARTIAL_SORT), 2),
+		super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction,
+			memoryManager.computeNumberOfPages(memoryFraction) < 2 * MIN_NUM_SORT_MEM_SEGMENTS ? 1 :
+				Math.max((int) Math.ceil(((double) memoryManager.computeMemorySize(memoryFraction)) /
+						MAX_MEM_PER_PARTIAL_SORT),	2),
 			2, 0.0f, true);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorterCollector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorterCollector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorterCollector.java
index 9064ab9..747f98b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorterCollector.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/AsynchronousPartialSorterCollector.java
@@ -45,7 +45,7 @@ public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSo
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting.
+	 * @param memoryFraction The fraction of memory dedicated to sorting.
 	 * 
 	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
@@ -54,10 +54,11 @@ public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSo
 	public AsynchronousPartialSorterCollector(MemoryManager memoryManager,
 			AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory)
+			double memoryFraction)
 	throws IOException, MemoryAllocationException
 	{
-		super(memoryManager, null, parentTask, serializerFactory, comparator, totalMemory);
+		super(memoryManager, null, parentTask, serializerFactory, comparator,
+				memoryFraction);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMerger.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMerger.java
index 51d136c..9eb0452 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMerger.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMerger.java
@@ -95,12 +95,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting, merging and I/O.
+	 * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O.
 	 * @param maxNumFileHandles The maximum number of files to be merged at once.
 	 * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread
 	 *                              actually begins spilling data to disk.
-	 * @param combineLastMerge A flag indicating whether the last merge step applies the combiner as well.
-	 * 
+	 *
 	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
@@ -108,11 +107,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory, int maxNumFileHandles, float startSpillingFraction)
+			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
 	throws IOException, MemoryAllocationException
 	{
 		this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			totalMemory, -1, maxNumFileHandles, startSpillingFraction);
+			memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
 	}
 	
 	/**
@@ -127,13 +126,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting, merging and I/O.
+	 * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O.
 	 * @param numSortBuffers The number of distinct buffers to use creation of the initial runs.
 	 * @param maxNumFileHandles The maximum number of files to be merged at once.
 	 * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread
 	 *                              actually begins spilling data to disk.
-	 * @param combineLastMerge A flag indicating whether the last merge step applies the combiner as well.
-	 * 
+	 *
 	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
@@ -141,12 +139,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory, int numSortBuffers, int maxNumFileHandles, 
+			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
 			float startSpillingFraction)
 	throws IOException, MemoryAllocationException
 	{
 		super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			totalMemory, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
+			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
 		
 		this.combineStub = combineStub;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.java
index 856ebf8..6905b85 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.java
@@ -174,7 +174,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting, merging and I/O.
+	 * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O.
 	 * @param maxNumFileHandles The maximum number of files to be merged at once.
 	 * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread
 	 *                              actually begins spilling data to disk.
@@ -186,11 +186,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory, int maxNumFileHandles, float startSpillingFraction)
+			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
 	throws IOException, MemoryAllocationException
 	{
 		this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			totalMemory, -1, maxNumFileHandles, startSpillingFraction);
+			memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
 	}
 	
 	/**
@@ -204,7 +204,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting, merging and I/O.
+	 * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O.
 	 * @param numSortBuffers The number of distinct buffers to use creation of the initial runs.
 	 * @param maxNumFileHandles The maximum number of files to be merged at once.
 	 * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread
@@ -217,12 +217,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory, int numSortBuffers, int maxNumFileHandles, 
+			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
 			float startSpillingFraction)
 	throws IOException, MemoryAllocationException
 	{
 		this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			totalMemory, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
+			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
 	}
 	
 	/**
@@ -234,7 +234,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 * @param parentTask The parent task, which owns all resources used by this sorter.
 	 * @param serializerFactory The type serializer.
 	 * @param comparator The type comparator establishing the order relation.
-	 * @param totalMemory The total amount of memory dedicated to sorting, merging and I/O.
+	 * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O.
 	 * @param numSortBuffers The number of distinct buffers to use creation of the initial runs.
 	 * @param maxNumFileHandles The maximum number of files to be merged at once.
 	 * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread
@@ -249,7 +249,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	protected UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			long totalMemory, int numSortBuffers, int maxNumFileHandles, 
+			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
 			float startSpillingFraction, boolean noSpillingMemory)
 	throws IOException, MemoryAllocationException
 	{
@@ -267,7 +267,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		this.memoryManager = memoryManager;
 		
 		// adjust the memory quotas to the page size
-		final int numPagesTotal = memoryManager.computeNumberOfPages(totalMemory);
+		final int numPagesTotal = memoryManager.computeNumberOfPages(memoryFraction);
 
 		if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) {
 			throw new IllegalArgumentException("Too little memory provided to sorter to perform task. " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java
index 66a4986..1d3c55d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java
@@ -67,7 +67,7 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
 		TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = 
 				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
 
-		int numMemoryPages = this.taskContext.getMemoryManager().computeNumberOfPages(config.getMemoryDriver());
+		int numMemoryPages = this.taskContext.getMemoryManager().computeNumberOfPages(config.getRelativeMemoryDriver());
 		List<MemorySegment> memSegments = this.taskContext.getMemoryManager().allocatePages(
 			this.taskContext.getOwningNepheleTask(), numMemoryPages);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/CrossDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/CrossDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/CrossDriver.java
index 39f563d..181a687 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/CrossDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/CrossDriver.java
@@ -116,8 +116,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 		}
 		
 		this.memManager = this.taskContext.getMemoryManager();
-		final long totalAvailableMemory = config.getMemoryDriver();
-		final int numPages = this.memManager.computeNumberOfPages(totalAvailableMemory);
+		final int numPages = this.memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
 		
 		if (numPages < 2) {
 			throw new RuntimeException(	"The Cross task was initialized with too little memory. " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index 638a7aa..82359f5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -138,7 +138,7 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
 							getEnvironment().getMemoryManager(), 
 							getEnvironment().getIOManager(),
 							this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(),
-							this.config.getMemoryInput(0), this.config.getFilehandlesInput(0),
+							this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
 							this.config.getSpillingThresholdInput(0));
 					
 					this.localStrategy = sorter;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.java
index 0d51363..8cef403 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/GroupReduceCombineDriver.java
@@ -79,8 +79,6 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
 		final TaskConfig config = this.taskContext.getTaskConfig();
 		final DriverStrategy ls = config.getDriverStrategy();
 
-		final long availableMemory = config.getMemoryDriver();
-
 		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
 
 		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
@@ -90,7 +88,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
 		switch (ls) {
 		case SORTED_GROUP_COMBINE:
 			this.input = new AsynchronousPartialSorter<T>(memoryManager, in, this.taskContext.getOwningNepheleTask(),
-						this.serializerFactory, this.comparator.duplicate(), availableMemory);
+						this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
 			break;
 		// obtain and return a grouped iterator from the combining sort-merger
 		default:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MatchDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MatchDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MatchDriver.java
index b356a58..a651894 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MatchDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MatchDriver.java
@@ -83,8 +83,8 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
 		final IOManager ioManager = this.taskContext.getIOManager();
 		
 		// set up memory and I/O parameters
-		final long availableMemory = config.getMemoryDriver();
-		final int numPages = memoryManager.computeNumberOfPages(availableMemory);
+		final double fractionAvailableMemory = config.getRelativeMemoryDriver();
+		final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
 		
 		// test minimum memory requirements
 		final DriverStrategy ls = config.getDriverStrategy();
@@ -106,23 +106,23 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
 
 		// create and return MatchTaskIterator according to provided local strategy.
 		switch (ls) {
-			case MERGE:
-				this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
-						serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2),
-						memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-				break;
-			case HYBRIDHASH_BUILD_FIRST:
-				this.matchIterator = new BuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
-					serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2),
-					memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), availableMemory);
-				break;
-			case HYBRIDHASH_BUILD_SECOND:
-				this.matchIterator = new BuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
-						serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2),
-						memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), availableMemory);
-				break;
-			default:
-				throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
+		case MERGE:
+			this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
+					serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2),
+					memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+			break;
+		case HYBRIDHASH_BUILD_FIRST:
+			this.matchIterator = new BuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
+				serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2),
+				memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+			break;
+		case HYBRIDHASH_BUILD_SECOND:
+			this.matchIterator = new BuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1,
+					serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2),
+					memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+			break;
+		default:
+			throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
 		}
 		
 		// open MatchTaskIterator - this triggers the sorting or hash-table building

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/ReduceCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/ReduceCombineDriver.java
index 14310ca..2eaba54 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/ReduceCombineDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/ReduceCombineDriver.java
@@ -99,7 +99,8 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
 		}
 		
 		this.memManager = this.taskContext.getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getMemoryDriver());
+		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig()
+				.getRelativeMemoryDriver());
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
index 92c4648..1d7c931 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
@@ -840,7 +840,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			this.inputIsCached[i] = cached;
 
 			if (async || cached) {
-				memoryPages = memMan.computeNumberOfPages(this.config.getInputMaterializationMemory(i));
+				memoryPages = memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i));
 				if (memoryPages <= 0) {
 					throw new Exception("Input marked as materialized/cached, but no memory for materialization provided.");
 				}
@@ -946,7 +946,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				@SuppressWarnings({ "rawtypes", "unchecked" })
 				UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
 					this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-					this.config.getMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
 					this.config.getSpillingThresholdInput(inputNum));
 				// set the input to null such that it will be lazily fetched from the input strategy
 				this.inputs[inputNum] = null;
@@ -982,7 +982,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
 					(GenericCombine) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], 
 					this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-					this.config.getMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
 					this.config.getSpillingThresholdInput(inputNum));
 				cSorter.setUdfConfiguration(this.config.getStubParameters());
 
@@ -1022,12 +1022,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
 			return iter;
 		}
-//		// generic data type serialization
-//		@SuppressWarnings("unchecked")
-//		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-//		@SuppressWarnings({ "unchecked", "rawtypes" })
-//		final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializer);
-//		return iter;
 	}
 
 	protected int getNumTaskInputs() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
index 814eb62..98d65f1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
@@ -81,7 +81,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		// ----------------- Set up the asynchronous sorter -------------------------
 
 		this.memManager = this.parent.getEnvironment().getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getMemoryDriver());
+		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
 
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
index 947e22a..a43f8cc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
@@ -454,12 +454,12 @@ public class TaskConfig {
 		return this.config.getBoolean(INPUT_REPLAYABLE_PREFIX + inputNum, false);
 	}
 	
-	public void setInputMaterializationMemory(int inputNum, long memory) {
-		this.config.setLong(INPUT_DAM_MEMORY_PREFIX + inputNum, memory);
+	public void setRelativeInputMaterializationMemory(int inputNum, double relativeMemory) {
+		this.config.setDouble(INPUT_DAM_MEMORY_PREFIX + inputNum, relativeMemory);
 	}
 	
-	public long getInputMaterializationMemory(int inputNum) {
-		return this.config.getLong(INPUT_DAM_MEMORY_PREFIX + inputNum, -1);
+	public double getRelativeInputMaterializationMemory(int inputNum) {
+		return this.config.getDouble(INPUT_DAM_MEMORY_PREFIX + inputNum, 0);
 	}
 	
 	public void setBroadcastInputName(String name, int groupIndex) {
@@ -577,20 +577,20 @@ public class TaskConfig {
 	//                       Parameters to configure the memory and I/O behavior
 	// --------------------------------------------------------------------------------------------
 
-	public void setMemoryDriver(long memorySize) {
-		this.config.setLong(MEMORY_DRIVER, memorySize);
+	public void setRelativeMemoryDriver(double relativeMemorySize) {
+		this.config.setDouble(MEMORY_DRIVER, relativeMemorySize);
 	}
 
-	public long getMemoryDriver() {
-		return this.config.getLong(MEMORY_DRIVER, -1);
+	public double getRelativeMemoryDriver() {
+		return this.config.getDouble(MEMORY_DRIVER, 0);
 	}
 	
-	public void setMemoryInput(int inputNum, long memorySize) {
-		this.config.setLong(MEMORY_INPUT_PREFIX + inputNum, memorySize);
+	public void setRelativeMemoryInput(int inputNum, double relativeMemorySize) {
+		this.config.setDouble(MEMORY_INPUT_PREFIX + inputNum, relativeMemorySize);
 	}
 
-	public long getMemoryInput(int inputNum) {
-		return this.config.getLong(MEMORY_INPUT_PREFIX + inputNum, -1);
+	public double getRelativeMemoryInput(int inputNum) {
+		return this.config.getDouble(MEMORY_INPUT_PREFIX + inputNum, 0);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -732,30 +732,30 @@ public class TaskConfig {
 		return index;
 	}
 	
-	public void setBackChannelMemory(long memory) {
-		if (memory < 0) {
+	public void setRelativeBackChannelMemory(double relativeMemory) {
+		if (relativeMemory < 0) {
 			throw new IllegalArgumentException();
 		}
-		this.config.setLong(ITERATION_HEAD_BACKCHANNEL_MEMORY, memory);
+		this.config.setDouble(ITERATION_HEAD_BACKCHANNEL_MEMORY, relativeMemory);
 	}
 
-	public long getBackChannelMemory() {
-		long backChannelMemory = this.config.getLong(ITERATION_HEAD_BACKCHANNEL_MEMORY, 0);
-		if (backChannelMemory <= 0) {
+	public double getRelativeBackChannelMemory() {
+		double relativeBackChannelMemory = this.config.getDouble(ITERATION_HEAD_BACKCHANNEL_MEMORY, 0);
+		if (relativeBackChannelMemory <= 0) {
 			throw new IllegalArgumentException();
 		}
-		return backChannelMemory;
+		return relativeBackChannelMemory;
 	}
 	
-	public void setSolutionSetMemory(long memory) {
-		if (memory < 0) {
+	public void setRelativeSolutionSetMemory(double relativeMemory) {
+		if (relativeMemory < 0) {
 			throw new IllegalArgumentException();
 		}
-		this.config.setLong(ITERATION_HEAD_SOLUTION_SET_MEMORY, memory);
+		this.config.setDouble(ITERATION_HEAD_SOLUTION_SET_MEMORY, relativeMemory);
 	}
 
-	public long getSolutionSetMemory() {
-		long backChannelMemory = this.config.getLong(ITERATION_HEAD_SOLUTION_SET_MEMORY, 0);
+	public double getRelativeSolutionSetMemory() {
+		double backChannelMemory = this.config.getDouble(ITERATION_HEAD_SOLUTION_SET_MEMORY, 0);
 		if (backChannelMemory <= 0) {
 			throw new IllegalArgumentException();
 		}
@@ -1198,6 +1198,16 @@ public class TaskConfig {
 		public void setFloat(String key, float value) {
 			this.backingConfig.setFloat(this.prefix + key, value);
 		}
+
+		@Override
+		public double getDouble(String key, double defaultValue) {
+			return this.backingConfig.getDouble(this.prefix + key, defaultValue);
+		}
+
+		@Override
+		public void setDouble(String key, double value) {
+			this.backingConfig.setDouble(this.prefix + key, value);
+		}
 		
 		@Override
 		public byte[] getBytes(final String key, final byte[] defaultValue) {
@@ -1220,16 +1230,6 @@ public class TaskConfig {
 		}
 		
 		@Override
-		public double getDouble(String key, double defaultValue) {
-			return backingConfig.getDouble(this.prefix + key, defaultValue);
-		}
-		
-		@Override
-		public void setDouble(String key, double value) {
-			backingConfig.setDouble(this.prefix + key, value);
-		}
-		
-		@Override
 		public String toString() {
 			return backingConfig.toString();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
index 6122c36..4109a2b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
@@ -82,8 +82,6 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 	private int lastReceivedEnvelope = -1;
 
-	private ChannelID lastSourceID = null;
-
 	private boolean destroyCalled = false;
 
 	// ----------------------
@@ -157,10 +155,6 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 		return this.inputGate.getJobID();
 	}
 
-//	public abstract AbstractTaskEvent getCurrentEvent();
-
-	private DeserializationResult lastDeserializationResult;
-
 
 	public InputChannelResult readRecord(T target) throws IOException {
 		if (this.dataBuffer == null) {
@@ -207,7 +201,6 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 		}
 
 		DeserializationResult deserializationResult = this.deserializer.getNextRecord(target);
-		this.lastDeserializationResult = deserializationResult;
 
 		if (deserializationResult.isBufferConsumed()) {
 			releasedConsumedReadBuffer(this.dataBuffer);
@@ -352,7 +345,6 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 
 				this.queuedEnvelopes.add(envelope);
 				this.lastReceivedEnvelope = sequenceNumber;
-				this.lastSourceID = envelope.getSource();
 
 				// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
 				if (envelope.getBuffer() != null) {
@@ -464,6 +456,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 		}
 
 		// schedule events as pending, because events come always after the buffer!
+		@SuppressWarnings("unchecked")
 		List<AbstractEvent> events = (List<AbstractEvent>) nextEnvelope.deserializeEvents();
 		Iterator<AbstractEvent> eventsIt = events.iterator();
 		if (eventsIt.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
index c623220..dc506ef 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
@@ -26,6 +26,7 @@ import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
 import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -95,6 +96,7 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
 		super(jobID, gateID, index);
 	}
 
+	@SuppressWarnings("unchecked")
 	public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
 		channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
index da36ad0..ab65b4c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
@@ -18,10 +18,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.util.StringUtils;
 
 /**
  * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
@@ -48,11 +46,9 @@ public final class RemoteReceiver implements IOReadableWritable {
 	 *        the index of the connection to the remote {@link TaskManager}
 	 */
 	public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
-
 		if (connectionAddress == null) {
 			throw new IllegalArgumentException("Argument connectionAddress must not be null");
 		}
-
 		if (connectionIndex < 0) {
 			throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
 		}
@@ -75,7 +71,6 @@ public final class RemoteReceiver implements IOReadableWritable {
 	 * @return the address of the connection to the remote {@link TaskManager}
 	 */
 	public InetSocketAddress getConnectionAddress() {
-
 		return this.connectionAddress;
 	}
 
@@ -85,14 +80,12 @@ public final class RemoteReceiver implements IOReadableWritable {
 	 * @return the index of the connection to the remote {@link TaskManager}
 	 */
 	public int getConnectionIndex() {
-
 		return this.connectionIndex;
 	}
 
 
 	@Override
 	public int hashCode() {
-
 		return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
 	}
 
@@ -131,18 +124,12 @@ public final class RemoteReceiver implements IOReadableWritable {
 
 	@Override
 	public void read(final DataInput in) throws IOException {
-
 		final int addr_length = in.readInt();
 		final byte[] address = new byte[addr_length];
 		in.readFully(address);
 
-		InetAddress ia = null;
-		try {
-			ia = InetAddress.getByAddress(address);
-		} catch (UnknownHostException uhe) {
-			throw new IOException(StringUtils.stringifyException(uhe));
-		}
-		final int port = in.readInt();
+		InetAddress ia = InetAddress.getByAddress(address);
+		int port = in.readInt();
 		this.connectionAddress = new InetSocketAddress(ia, port);
 
 		this.connectionIndex = in.readInt();
@@ -151,7 +138,6 @@ public final class RemoteReceiver implements IOReadableWritable {
 
 	@Override
 	public String toString() {
-
 		return this.connectionAddress + " (" + this.connectionIndex + ")";
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/event/job/ManagementEventTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/event/job/ManagementEventTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/event/job/ManagementEventTest.java
index a6a5875..fd1a672 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/event/job/ManagementEventTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/event/job/ManagementEventTest.java
@@ -85,14 +85,12 @@ public class ManagementEventTest {
 	@Test
 	public void testVertexAssignmentEvent() {
 
-		final VertexAssignmentEvent orig = new VertexAssignmentEvent(TIMESTAMP, new ManagementVertexID(), "test",
-			"standard");
+		final VertexAssignmentEvent orig = new VertexAssignmentEvent(TIMESTAMP, new ManagementVertexID(), "test");
 		final VertexAssignmentEvent copy = (VertexAssignmentEvent) ManagementTestUtils.createCopy(orig);
 
 		assertEquals(orig.getVertexID(), copy.getVertexID());
 		assertEquals(orig.getTimestamp(), copy.getTimestamp());
 		assertEquals(orig.getInstanceName(), copy.getInstanceName());
-		assertEquals(orig.getInstanceType(), copy.getInstanceType());
 		assertEquals(orig.hashCode(), copy.hashCode());
 		assertTrue(orig.equals(copy));
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
index 5ff5f1c..fa0653b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
@@ -23,40 +23,25 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.log4j.Level;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.instance.InstanceTypeFactory;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.runtime.io.channels.ChannelType;
-import eu.stratosphere.nephele.util.FileLineReader;
-import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.topology.NetworkTopology;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.ServerTestUtils;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.util.LogUtils;
 
 /**
@@ -64,152 +49,6 @@ import eu.stratosphere.util.LogUtils;
  * 
  */
 public class ExecutionGraphTest {
-
-	/**
-	 * The name of the default instance type used during these tests.
-	 */
-	private static final String DEFAULT_INSTANCE_TYPE_NAME = "test";
-
-	/**
-	 * A test implementation of an {@link InstanceManager} which is used as a stub in these tests.
-	 * 
-	 */
-	private static final class TestInstanceManager implements InstanceManager {
-
-		/**
-		 * The default instance type.
-		 */
-		private final InstanceType defaultInstanceType;
-
-		/**
-		 * Constructs a new test instance manager.
-		 */
-		public TestInstanceManager() {
-			this.defaultInstanceType = InstanceTypeFactory.construct(DEFAULT_INSTANCE_TYPE_NAME, 4, 4, 1024, 50, 10);
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void requestInstance(final JobID jobID, final Configuration conf,
-				final InstanceRequestMap instanceRequestMap,
-				final List<String> splitAffinityList) throws InstanceException {
-
-			throw new IllegalStateException("requestInstance called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void releaseAllocatedResource(final JobID jobID, final Configuration conf,
-				final AllocatedResource allocatedResource)
-				throws InstanceException {
-
-			throw new IllegalStateException("releaseAllocatedResource called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public InstanceType getSuitableInstanceType(final int minNumComputeUnits, final int minNumCPUCores,
-				final int minMemorySize, final int minDiskCapacity, final int maxPricePerHour) {
-
-			throw new IllegalStateException("getSuitableInstanceType called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo,
-				final HardwareDescription hardwareDescription) {
-
-			throw new IllegalStateException("reportHeartBeat called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public InstanceType getInstanceTypeByName(final String instanceTypeName) {
-
-			if (this.defaultInstanceType.getIdentifier().equals(instanceTypeName)) {
-				return this.defaultInstanceType;
-			}
-
-			return null;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public InstanceType getDefaultInstanceType() {
-
-			return this.defaultInstanceType;
-		}
-
-		@Override
-		public NetworkTopology getNetworkTopology(final JobID jobID) {
-
-			throw new IllegalStateException("getNetworkTopology called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void setInstanceListener(final InstanceListener instanceListener) {
-
-			throw new IllegalStateException("setInstanceListener called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
-
-			throw new IllegalStateException("getMapOfAvailableInstanceType called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void shutdown() {
-
-			throw new IllegalStateException("shutdown called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public AbstractInstance getInstanceByName(final String name) {
-			throw new IllegalStateException("getInstanceByName called on TestInstanceManager");
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void cancelPendingRequests(final JobID jobID) {
-			throw new IllegalStateException("cancelPendingRequests called on TestInstanceManager");
-		}
-
-		@Override
-		public int getNumberOfTaskTrackers() {
-			return 0;
-		}
-
-	}
-
-	private static final InstanceManager INSTANCE_MANAGER = new TestInstanceManager();
-
 	@BeforeClass
 	public static void reduceLogLevel() {
 		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
@@ -259,15 +98,11 @@ public class ExecutionGraphTest {
 
 			LibraryCacheManager.register(jobID, new String[0]);
 
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, -1);
 
 			// test all methods of ExecutionGraph
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
 			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-			assertEquals(1, instanceRequestMap.size());
-			assertEquals(1, (int) instanceRequestMap.getMaximumNumberOfInstances(INSTANCE_MANAGER
-					.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME)));
+			assertEquals(1, executionStage.getMaxNumberSubtasks());
 
 			assertEquals(jobID, eg.getJobID());
 			assertEquals(0, eg.getIndexOfCurrentExecutionStage());
@@ -332,15 +167,11 @@ public class ExecutionGraphTest {
 			assertNotNull(egv0.getGroupMember(0));
 			assertNull(egv0.getGroupMember(1));
 			assertEquals(1, egv0.getInputSplits().length);
-			assertEquals(-1, egv0.getMaximumNumberOfGroupMembers());
-			assertEquals(1, egv0.getMinimumNumberOfGroupMember());
 			assertEquals("Input 1", egv0.getName());
 			assertEquals(0, egv0.getNumberOfBackwardLinks());
 			assertEquals(1, egv0.getNumberOfForwardLinks());
-			assertEquals(1, egv0.getNumberOfSubtasksPerInstance());
 			assertEquals(0, egv0.getStageNumber());
 			assertEquals(-1, egv0.getUserDefinedNumberOfMembers());
-			assertEquals(INSTANCE_MANAGER.getDefaultInstanceType(), egv0.getInstanceType());
 			assertEquals("Task 1", egv0.getVertexToShareInstancesWith().getName());
 
 			// egv1 (output1)
@@ -354,15 +185,11 @@ public class ExecutionGraphTest {
 			assertNull(egv1.getForwardEdge(0));
 			assertNotNull(egv1.getGroupMember(0));
 			assertNull(egv1.getGroupMember(1));
-			assertEquals(1, egv1.getMaximumNumberOfGroupMembers());
-			assertEquals(1, egv1.getMinimumNumberOfGroupMember());
 			assertEquals("Output 1", egv1.getName());
 			assertEquals(1, egv1.getNumberOfBackwardLinks());
 			assertEquals(0, egv1.getNumberOfForwardLinks());
-			assertEquals(1, egv1.getNumberOfSubtasksPerInstance());
 			assertEquals(0, egv1.getStageNumber());
 			assertEquals(-1, egv1.getUserDefinedNumberOfMembers());
-			assertEquals(INSTANCE_MANAGER.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME), egv1.getInstanceType());
 			assertEquals("Input 1", egv1.getVertexToShareInstancesWith().getName());
 
 			// egv2 (task1)
@@ -378,15 +205,11 @@ public class ExecutionGraphTest {
 			assertNotNull(egv2.getForwardEdges(egv1));
 			assertNotNull(egv2.getGroupMember(0));
 			assertNull(egv2.getGroupMember(1));
-			assertEquals(-1, egv2.getMaximumNumberOfGroupMembers());
-			assertEquals(1, egv2.getMinimumNumberOfGroupMember());
 			assertEquals("Task 1", egv2.getName());
 			assertEquals(1, egv2.getNumberOfBackwardLinks());
 			assertEquals(1, egv2.getNumberOfForwardLinks());
-			assertEquals(1, egv2.getNumberOfSubtasksPerInstance());
 			assertEquals(0, egv2.getStageNumber());
 			assertEquals(-1, egv2.getUserDefinedNumberOfMembers());
-			assertEquals(INSTANCE_MANAGER.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME), egv2.getInstanceType());
 			assertNull(egv2.getVertexToShareInstancesWith());
 
 			// test all methods of ExecutionVertex
@@ -398,25 +221,16 @@ public class ExecutionGraphTest {
 			assertEquals(egv0, ev0.getGroupVertex());
 			assertNotNull(ev0.getID());
 			assertEquals("Input 1", ev0.getName());
-			assertEquals(INSTANCE_MANAGER.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME), ev0.getAllocatedResource()
-				.getInstance()
-				.getType());
 
 			// ev1 (output1)
 			assertEquals(egv1, ev1.getGroupVertex());
 			assertNotNull(ev1.getID());
 			assertEquals("Output 1", ev1.getName());
-			assertEquals(INSTANCE_MANAGER.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME), ev1.getAllocatedResource()
-				.getInstance()
-				.getType());
 
 			// ev2 (task1)
 			assertEquals(egv2, ev2.getGroupVertex());
 			assertNotNull(ev2.getID());
 			assertEquals("Task 1", ev2.getName());
-			assertEquals(INSTANCE_MANAGER.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME), ev2.getAllocatedResource()
-				.getInstance()
-				.getType());
 
 			assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
 			assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
@@ -448,7 +262,7 @@ public class ExecutionGraphTest {
 	 * input1 -> task1 -> output1
 	 * no subtasks defined
 	 * input1 is default, task1 is m1.large, output1 is m1.xlarge
-	 * all channels are IN_MEMORY
+	 * all channels are INMEMORY
 	 */
 	@Test
 	public void testConvertJobGraphToExecutionGraph2() {
@@ -484,15 +298,11 @@ public class ExecutionGraphTest {
 			LibraryCacheManager.register(jobID, new String[0]);
 
 			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			// test instance types in ExecutionGraph
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
 			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-			assertEquals(1, instanceRequestMap.size());
-			assertEquals(1,
-				(int) instanceRequestMap.getMaximumNumberOfInstances(INSTANCE_MANAGER.getDefaultInstanceType()));
+			assertEquals(1, executionStage.getMaxNumberSubtasks());
 
 			// stage0
 			ExecutionStage es = eg.getStage(0);
@@ -523,12 +333,6 @@ public class ExecutionGraphTest {
 			ExecutionVertex ev0 = egv0.getGroupMember(0); // input1
 			ExecutionVertex ev1 = egv1.getGroupMember(0); // output1
 			ExecutionVertex ev2 = egv2.getGroupMember(0); // task1
-			// ev0 (input1)
-			assertEquals(INSTANCE_MANAGER.getDefaultInstanceType(), ev0.getAllocatedResource().getInstance().getType());
-			// ev1 (output1)
-			assertEquals(INSTANCE_MANAGER.getDefaultInstanceType(), ev1.getAllocatedResource().getInstance().getType());
-			// ev2 (task1)
-			assertEquals(INSTANCE_MANAGER.getDefaultInstanceType(), ev2.getAllocatedResource().getInstance().getType());
 			assertEquals(ev0.getAllocatedResource(), ev1.getAllocatedResource());
 			assertEquals(ev0.getAllocatedResource(), ev2.getAllocatedResource());
 		} catch (GraphConversionException e) {
@@ -618,15 +422,11 @@ public class ExecutionGraphTest {
 
 			LibraryCacheManager.register(jobID, new String[0]);
 
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			// test instance types in ExecutionGraph
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
 			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
-			executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-			assertEquals(1, instanceRequestMap.size());
-			assertEquals(2,
-				(int) instanceRequestMap.getMaximumNumberOfInstances(INSTANCE_MANAGER.getDefaultInstanceType()));
+			assertEquals(2, executionStage.getMaxNumberSubtasks());
 
 			// stage0
 			final ExecutionStage es = eg.getStage(0);
@@ -828,40 +628,32 @@ public class ExecutionGraphTest {
 			i1.setFileInputClass(FileLineReader.class);
 			i1.setFilePath(new Path(inputFile1.toURI()));
 			i1.setNumberOfSubtasks(4);
-			i1.setNumberOfSubtasksPerInstance(2);
 			final JobFileInputVertex i2 = new JobFileInputVertex("Input 2", jg);
 			i2.setFileInputClass(FileLineReader.class);
 			i2.setFilePath(new Path(inputFile2.toURI()));
 			i2.setNumberOfSubtasks(4);
-			i2.setNumberOfSubtasksPerInstance(2);
 			// task vertex
 			final JobTaskVertex t1 = new JobTaskVertex("Task 1", jg);
 			t1.setTaskClass(ForwardTask1Input1Output.class);
 			t1.setNumberOfSubtasks(4);
-			t1.setNumberOfSubtasksPerInstance(2);
 			final JobTaskVertex t2 = new JobTaskVertex("Task 2", jg);
 			t2.setTaskClass(ForwardTask1Input1Output.class);
 			t2.setNumberOfSubtasks(4);
-			t2.setNumberOfSubtasksPerInstance(2);
 			final JobTaskVertex t3 = new JobTaskVertex("Task 3", jg);
 			t3.setTaskClass(ForwardTask2Inputs1Output.class);
 			t3.setNumberOfSubtasks(8);
-			t3.setNumberOfSubtasksPerInstance(4);
 			final JobTaskVertex t4 = new JobTaskVertex("Task 4", jg);
 			t4.setTaskClass(ForwardTask1Input2Outputs.class);
 			t4.setNumberOfSubtasks(8);
-			t4.setNumberOfSubtasksPerInstance(4);
 			// output vertex
 			final JobFileOutputVertex o1 = new JobFileOutputVertex("Output 1", jg);
 			o1.setFileOutputClass(FileLineWriter.class);
 			o1.setFilePath(new Path(outputFile1.toURI()));
 			o1.setNumberOfSubtasks(4);
-			o1.setNumberOfSubtasksPerInstance(2);
 			final JobFileOutputVertex o2 = new JobFileOutputVertex("Output 2", jg);
 			o2.setFileOutputClass(FileLineWriter.class);
 			o2.setFilePath(new Path(outputFile2.toURI()));
 			o2.setNumberOfSubtasks(4);
-			o2.setNumberOfSubtasksPerInstance(2);
 			o1.setVertexToShareInstancesWith(o2);
 
 			// connect vertices
@@ -876,19 +668,14 @@ public class ExecutionGraphTest {
 			LibraryCacheManager.register(jobID, new String[0]);
 
 			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			// test instance types in ExecutionGraph
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
 			ExecutionStage executionStage = eg.getCurrentExecutionStage();
 			assertNotNull(executionStage);
 			assertEquals(0, executionStage.getStageNumber());
 			
-			executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-			assertEquals(1, instanceRequestMap.size());
-			assertEquals(8,
-				(int) instanceRequestMap.getMaximumNumberOfInstances(INSTANCE_MANAGER
-					.getInstanceTypeByName(DEFAULT_INSTANCE_TYPE_NAME)));
+			assertEquals(20, executionStage.getRequiredSlots());
 			// Fake transition to next stage by triggering execution state changes manually
 			final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(),
 				true, true);
@@ -903,7 +690,6 @@ public class ExecutionGraphTest {
 				ev.updateExecutionState(ExecutionState.FINISHING);
 				ev.updateExecutionState(ExecutionState.FINISHED);
 			}
-			instanceRequestMap.clear();
 		} catch (GraphConversionException e) {
 			fail(e.getMessage());
 		} catch (JobGraphDefinitionException e) {
@@ -983,7 +769,7 @@ public class ExecutionGraphTest {
 			LibraryCacheManager.register(jobID, new String[0]);
 
 			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			assertEquals(1, eg.getNumberOfStages());
 
@@ -1116,9 +902,9 @@ public class ExecutionGraphTest {
 			input1.connectTo(forward1, ChannelType.IN_MEMORY,
 				DistributionPattern.POINTWISE);
 			forward1.connectTo(forward2, ChannelType.IN_MEMORY,
-				DistributionPattern.POINTWISE);
+					DistributionPattern.POINTWISE);
 			forward2.connectTo(forward3, ChannelType.NETWORK,
-				DistributionPattern.POINTWISE);
+					DistributionPattern.POINTWISE);
 			forward3.connectTo(output1, ChannelType.IN_MEMORY);
 
 			// setup instance sharing
@@ -1130,7 +916,7 @@ public class ExecutionGraphTest {
 			LibraryCacheManager.register(jobID, new String[0]);
 
 			// now convert job graph to execution graph
-			final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			// Check number of stages
 			assertEquals(1, eg.getNumberOfStages());
@@ -1139,16 +925,8 @@ public class ExecutionGraphTest {
 			final ExecutionStage stage = eg.getStage(0);
 			assertEquals(5, stage.getNumberOfStageMembers());
 
-			// Check number of required instances
-			final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-			stage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
-			// First, we expect all required instances to be of the same type
-			assertEquals(1, instanceRequestMap.size());
-
-			final int numberOfRequiredInstances = instanceRequestMap.getMinimumNumberOfInstances(INSTANCE_MANAGER
-				.getDefaultInstanceType());
-			assertEquals(degreeOfParallelism, numberOfRequiredInstances);
+			final int numberOfRequiredSlots = stage.getMaxNumberSubtasks();
+			assertEquals(degreeOfParallelism, numberOfRequiredSlots);
 
 		} catch (GraphConversionException e) {
 			fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTest.java
deleted file mode 100644
index 72d58c9..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.net.InetAddress;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.instance.HardwareDescription;
-import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.util.LogUtils;
-
-/**
- * Tests for {@link ClusterManager}.
- */
-public class ClusterManagerTest {
-
-	@BeforeClass
-	public static void initLogging() {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
-	
-	@Test
-	public void testInstanceRegistering() {
-		try {
-			ClusterManager cm = new ClusterManager();
-			TestInstanceListener testInstanceListener = new TestInstanceListener();
-			cm.setInstanceListener(testInstanceListener);
-			
-			
-			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
-			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
-
-			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
-																				2L * 1024L * 1024L * 1024L);
-
-			String hostname = "192.168.198.1";
-			InetAddress address = InetAddress.getByName("192.168.198.1");
-			
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
-			
-			// register three instances
-			cm.reportHeartBeat(ici1, hardwareDescription);
-			cm.reportHeartBeat(ici2, hardwareDescription);
-			cm.reportHeartBeat(ici3, hardwareDescription);
-			
-			
-			Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptions = cm.getMapOfAvailableInstanceTypes();
-			assertEquals(1, instanceTypeDescriptions.size());
-			
-			InstanceTypeDescription descr = instanceTypeDescriptions.entrySet().iterator().next().getValue();
-			
-			assertEquals(3, descr.getMaximumNumberOfAvailableInstances());
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
-	@Test
-	public void testAllocationDeallocation() {
-		try {
-			ClusterManager cm = new ClusterManager();
-			TestInstanceListener testInstanceListener = new TestInstanceListener();
-			cm.setInstanceListener(testInstanceListener);
-			
-			
-			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
-			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
-
-			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
-																				2L * 1024L * 1024L * 1024L);
-
-			String hostname = "192.168.198.1";
-			InetAddress address = InetAddress.getByName("192.168.198.1");
-			
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
-			
-			// register three instances
-			cm.reportHeartBeat(ici1, hardwareDescription);
-			cm.reportHeartBeat(ici2, hardwareDescription);
-			
-			
-			Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptions = cm.getMapOfAvailableInstanceTypes();
-			assertEquals(1, instanceTypeDescriptions.size());
-			
-			InstanceTypeDescription descr = instanceTypeDescriptions.entrySet().iterator().next().getValue();
-			
-			assertEquals(2, descr.getMaximumNumberOfAvailableInstances());
-			
-			
-			// allocate something
-			JobID jobID = new JobID();
-			Configuration conf = new Configuration();
-			InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
-			instanceRequestMap.setNumberOfInstances(cm.getDefaultInstanceType(), 2);
-			cm.requestInstance(jobID, conf, instanceRequestMap, null);
-			
-			ClusterManagerTestUtils.waitForInstances(jobID, testInstanceListener, 3, 1000);
-			
-			List<AllocatedResource> allocatedResources = testInstanceListener.getAllocatedResourcesForJob(jobID);
-			assertEquals(2, allocatedResources.size());
-			
-			Iterator<AllocatedResource> it = allocatedResources.iterator();
-			Set<AllocationID> allocationIDs = new HashSet<AllocationID>();
-			while (it.hasNext()) {
-				AllocatedResource allocatedResource = it.next();
-				if (ConfigConstants.DEFAULT_INSTANCE_TYPE.equals(allocatedResource.getInstance().getType().getIdentifier())) {
-					fail("Allocated unexpected instance of type "
-						+ allocatedResource.getInstance().getType().getIdentifier());
-				}
-
-				if (allocationIDs.contains(allocatedResource.getAllocationID())) {
-					fail("Discovered allocation ID " + allocatedResource.getAllocationID() + " at least twice");
-				} else {
-					allocationIDs.add(allocatedResource.getAllocationID());
-				}
-			}
-
-			// Try to allocate more resources which must result in an error
-			try {
-				InstanceRequestMap instancem = new InstanceRequestMap();
-				instancem.setNumberOfInstances(cm.getDefaultInstanceType(), 1);
-				cm.requestInstance(jobID, conf, instancem, null);
-
-				fail("ClusterManager allowed to request more instances than actually available");
-
-			} catch (InstanceException ie) {
-				// Exception is expected and correct behavior here
-			}
-
-			// Release all allocated resources
-			it = allocatedResources.iterator();
-			while (it.hasNext()) {
-				final AllocatedResource allocatedResource = it.next();
-				cm.releaseAllocatedResource(jobID, conf, allocatedResource);
-			}
-			
-			// Now further allocations should be possible
-			
-			InstanceRequestMap instancem = new InstanceRequestMap();
-			instancem.setNumberOfInstances(cm.getDefaultInstanceType(), 1);
-			cm.requestInstance(jobID, conf, instancem, null);
-			
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * This test checks the clean-up routines of the cluster manager.
-	 */
-	@Test
-	public void testCleanUp() {
-		try {
-			
-			final int CLEANUP_INTERVAL = 2;
-			
-			// configure a short cleanup interval
-			Configuration config = new Configuration();
-			config.setInteger("instancemanager.cluster.cleanupinterval", CLEANUP_INTERVAL);
-			GlobalConfiguration.includeConfiguration(config);
-			
-			ClusterManager cm = new ClusterManager();
-			TestInstanceListener testInstanceListener = new TestInstanceListener();
-			cm.setInstanceListener(testInstanceListener);
-			
-			
-			int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
-			int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
-
-			HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
-																				2L * 1024L * 1024L * 1024L);
-
-			String hostname = "192.168.198.1";
-			InetAddress address = InetAddress.getByName("192.168.198.1");
-			
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
-			
-			// register three instances
-			cm.reportHeartBeat(ici1, hardwareDescription);
-			cm.reportHeartBeat(ici2, hardwareDescription);
-			cm.reportHeartBeat(ici3, hardwareDescription);
-			
-			
-			Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptions = cm.getMapOfAvailableInstanceTypes();
-			assertEquals(1, instanceTypeDescriptions.size());
-			
-			InstanceTypeDescription descr = instanceTypeDescriptions.entrySet().iterator().next().getValue();
-			assertEquals(3, descr.getMaximumNumberOfAvailableInstances());
-			
-			// request some instances
-			JobID jobID = new JobID();
-			Configuration conf = new Configuration();
-
-			InstanceRequestMap instancem = new InstanceRequestMap();
-			instancem.setNumberOfInstances(cm.getDefaultInstanceType(), 1);
-			cm.requestInstance(jobID, conf, instancem, null);
-			
-			ClusterManagerTestUtils.waitForInstances(jobID, testInstanceListener, 1, 1000);
-			assertEquals(1, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
-			
-			// wait for the cleanup to kick in
-			Thread.sleep(2000 * CLEANUP_INTERVAL);
-			
-			// check that the instances are gone
-			ClusterManagerTestUtils.waitForInstances(jobID, testInstanceListener, 0, 1000);
-			assertEquals(0, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
-			
-			
-			instanceTypeDescriptions = cm.getMapOfAvailableInstanceTypes();
-			assertEquals(1, instanceTypeDescriptions.size());
-			
-			descr = instanceTypeDescriptions.entrySet().iterator().next().getValue();
-			assertEquals(0, descr.getMaximumNumberOfAvailableInstances());
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTestUtils.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTestUtils.java
deleted file mode 100644
index e311533..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/cluster/ClusterManagerTestUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.instance.cluster;
-
-import eu.stratosphere.nephele.instance.InstanceListener;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * This class contains utility methods used during the tests of the {@link ClusterManager} implementation.
- * 
- */
-public class ClusterManagerTestUtils {
-
-	/**
-	 * Granularity of the sleep time.
-	 */
-	private static final long SLEEP_TIME = 10; // 10 milliseconds
-
-	/**
-	 * Private constructor so the class cannot be instantiated.
-	 */
-	private ClusterManagerTestUtils() {
-	}
-
-	/**
-	 * Waits until a specific number of instances have registered or deregistrations with the given
-	 * {@link InstanceListener} object for a given job or the maximum wait time has elapsed.
-	 * 
-	 * @param jobID
-	 *        the ID of the job to check the instance registration for
-	 * @param instanceListener
-	 *        the listener which shall be notified when a requested instance is available for the job
-	 * @param numberOfInstances
-	 *        the number of registered instances to wait for
-	 * @param maxWaitTime
-	 *        the maximum wait time before this method returns
-	 */
-	public static void waitForInstances(JobID jobID, TestInstanceListener instanceListener,
-			int numberOfInstances, long maxWaitTime) {
-
-		final long startTime = System.currentTimeMillis();
-
-		while (instanceListener.getNumberOfAllocatedResourcesForJob(jobID) != numberOfInstances) {
-			try {
-				Thread.sleep(SLEEP_TIME);
-			} catch (InterruptedException e) {
-				break;
-			}
-
-			if ((System.currentTimeMillis() - startTime) >= maxWaitTime) {
-				break;
-			}
-		}
-	}
-}


Mime
View raw message